You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/08/31 16:24:53 UTC

[ignite] branch ignite-2.9-revert-12568 created (now ed52559)

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a change to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git.


      at ed52559  Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type"

This branch includes the following new commits:

     new f1fcea2  Revert "IGNITE-12756 TcpCommunication SPI metrics improvement"
     new 3d57f23  Revert "IGNITE-12682 IgniteMessageFactoryImpl.registerCustom() method is removed as potentially dangerous"
     new ed52559  Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type"

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite] 01/03: Revert "IGNITE-12756 TcpCommunication SPI metrics improvement"

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit f1fcea2e8f788a3039fe91218b990c21c778f238
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Aug 31 18:59:34 2020 +0300

    Revert "IGNITE-12756 TcpCommunication SPI metrics improvement"
    
    This reverts commit 683f22e6
---
 .../internal/managers/GridManagerAdapter.java      |  18 ---
 .../managers/communication/GridIoManager.java      | 100 ++++++-------
 .../communication/IgniteMessageFactoryImpl.java    |  37 +----
 .../processors/resource/GridResourceIoc.java       |  21 ++-
 .../processors/resource/GridResourceProcessor.java |   4 +-
 .../internal/resources/MetricManagerResource.java  |  32 +++++
 .../org/apache/ignite/spi/IgniteSpiAdapter.java    |  33 +----
 .../org/apache/ignite/spi/IgniteSpiContext.java    |  36 -----
 .../tcp/TcpCommunicationMetricsListener.java       | 156 +++++++++------------
 .../spi/communication/tcp/TcpCommunicationSpi.java |  29 ++--
 .../IgniteMessageFactoryImplTest.java              |  47 +------
 .../ignite/testframework/GridSpiTestContext.java   |  30 +---
 .../testframework/junits/IgniteTestResources.java  |  16 ++-
 13 files changed, 199 insertions(+), 360 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index d10d774..d2952d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Consumer;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.expiry.TouchedExpiryPolicy;
@@ -63,7 +62,6 @@ import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -626,22 +624,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         ctx.discovery().resolveCommunicationError(node, err);
                     }
 
-                    @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
-                        return ctx.metric().registry(name);
-                    }
-
-                    @Override public void removeMetricRegistry(String name) {
-                        ctx.metric().remove(name);
-                    }
-
-                    @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
-                        return ctx.metric();
-                    }
-
-                    @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
-                        ctx.metric().addMetricRegistryCreationListener(lsnr);
-                    }
-
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 25581ce..552613f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -432,6 +432,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
+        CommunicationSpi<Serializable> spi = getSpi();
+
+        if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
+            getTcpCommunicationSpi().setConnectionRequestor(invConnHandler);
+
+        startSpi();
+
+        MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
+
+        ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
+                "Outbound messages queue size.");
+
+        ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
+
+        ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
+
+        ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
+                "Received messages count.");
+
+        ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
+
+        getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
+            @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
+                try {
+                    onMessage0(nodeId, (GridIoMessage)msg, msgC);
+                }
+                catch (ClassCastException ignored) {
+                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                        msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                        "which is illegal - make sure to send messages only via GridProjection API.");
+                }
+            }
+
+            @Override public void onDisconnected(UUID nodeId) {
+                for (GridDisconnectListener lsnr : disconnectLsnrs)
+                    lsnr.onNodeDisconnected(nodeId);
+            }
+
+            @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
+                try {
+                    onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
+                }
+                catch (ClassCastException ignored) {
+                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
+                        initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
+                        "which is illegal - make sure to send messages only via GridProjection API.");
+                }
+            }
+        });
+
         ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
 
         MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
@@ -481,56 +531,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         msgFactory = new IgniteMessageFactoryImpl(msgs);
 
-        CommunicationSpi<Serializable> spi = getSpi();
-
-        if ((CommunicationSpi<?>)spi instanceof TcpCommunicationSpi)
-            getTcpCommunicationSpi().setConnectionRequestor(invConnHandler);
-
-        startSpi();
-
-        MetricRegistry ioMetric = ctx.metric().registry(COMM_METRICS);
-
-        ioMetric.register(OUTBOUND_MSG_QUEUE_CNT, spi::getOutboundMessagesQueueSize,
-                "Outbound messages queue size.");
-
-        ioMetric.register(SENT_MSG_CNT, spi::getSentMessagesCount, "Sent messages count.");
-
-        ioMetric.register(SENT_BYTES_CNT, spi::getSentBytesCount, "Sent bytes count.");
-
-        ioMetric.register(RCVD_MSGS_CNT, spi::getReceivedMessagesCount,
-                "Received messages count.");
-
-        ioMetric.register(RCVD_BYTES_CNT, spi::getReceivedBytesCount, "Received bytes count.");
-
-        getSpi().setListener(commLsnr = new CommunicationListenerEx<Serializable>() {
-            @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
-                try {
-                    onMessage0(nodeId, (GridIoMessage)msg, msgC);
-                }
-                catch (ClassCastException ignored) {
-                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
-                            msg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
-                            "which is illegal - make sure to send messages only via GridProjection API.");
-                }
-            }
-
-            @Override public void onDisconnected(UUID nodeId) {
-                for (GridDisconnectListener lsnr : disconnectLsnrs)
-                    lsnr.onNodeDisconnected(nodeId);
-            }
-
-            @Override public void onChannelOpened(UUID rmtNodeId, Serializable initMsg, Channel channel) {
-                try {
-                    onChannelOpened0(rmtNodeId, (GridIoMessage)initMsg, channel);
-                }
-                catch (ClassCastException ignored) {
-                    U.error(log, "Communication manager received message of unknown type (will ignore): " +
-                            initMsg.getClass().getName() + ". Most likely GridCommunicationSpi is being used directly, " +
-                            "which is illegal - make sure to send messages only via GridProjection API.");
-                }
-            }
-        });
-
         if (log.isDebugEnabled())
             log.debug(startInfo());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 68ce797..957ef7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -45,15 +45,6 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
     /** Initialized flag. If {@code true} then new message type couldn't be registered. */
     private boolean initialized;
 
-    /** Min index of registered message supplier. */
-    private int minIdx = Integer.MAX_VALUE;
-
-    /** Max index of registered message supplier. */
-    private int maxIdx = -1;
-
-    /** Count of registered message suppliers. */
-    private int cnt;
-
     /**
      * Contructor.
      *
@@ -106,21 +97,15 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
 
         Supplier<Message> curr = msgSuppliers[idx];
 
-        if (curr == null) {
+        if (curr == null)
             msgSuppliers[idx] = supplier;
-
-            minIdx = Math.min(idx, minIdx);
-
-            maxIdx = Math.max(idx, maxIdx);
-
-            cnt++;
-        }
         else
             throw new IgniteException("Message factory is already registered for direct type: " + directType);
     }
 
     /**
      * Creates new message instance of provided direct type.
+     * <p>
      *
      * @param directType Message direct type.
      * @return Message instance.
@@ -136,24 +121,6 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
     }
 
     /**
-     * Returns direct types of all registered messages.
-     *
-     * @return Direct types of all registered messages.
-     */
-    public short[] registeredDirectTypes() {
-        short[] res = new short[cnt];
-
-        if (cnt > 0) {
-            for (int i = minIdx, p = 0; i <= maxIdx; i++) {
-                if (msgSuppliers[i] != null)
-                    res[p++] = indexToDirectType(i);
-            }
-        }
-
-        return res;
-    }
-
-    /**
      * @param directType Direct type.
      */
     private static int directTypeToIndex(short directType) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 4b43e04..cecaf7e86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.GridLeanIdentitySet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -131,7 +132,7 @@ public class GridResourceIoc {
                 break;
 
             if (dep != null) {
-                Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.newCSet());
+                Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet());
 
                 classes.add(cls);
 
@@ -264,7 +265,7 @@ public class GridResourceIoc {
 
             boolean allowImplicitInjection = !GridNoImplicitInjection.class.isAssignableFrom(cls);
 
-            for (Class<?> cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
+            for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
                 for (Field field : cls0.getDeclaredFields()) {
                     Annotation[] fieldAnns = field.getAnnotations();
 
@@ -272,7 +273,9 @@ public class GridResourceIoc {
                         T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
 
                         if (t2 == null) {
-                            t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
+                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+                                new ArrayList<GridResourceField>(),
+                                new ArrayList<GridResourceMethod>());
 
                             annMap.put(ann.annotationType(), t2);
                         }
@@ -295,7 +298,9 @@ public class GridResourceIoc {
                         T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
 
                         if (t2 == null) {
-                            t2 = new T2<>(new ArrayList<>(), new ArrayList<>());
+                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+                                new ArrayList<GridResourceField>(),
+                                new ArrayList<GridResourceMethod>());
 
                             annMap.put(ann.annotationType(), t2);
                         }
@@ -506,7 +511,10 @@ public class GridResourceIoc {
         JOB_CONTEXT(JobContextResource.class),
 
         /** */
-        CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+        CACHE_STORE_SESSION(CacheStoreSessionResource.class),
+
+        /** */
+        METRIC_MANAGER(MetricManagerResource.class);
 
         /** */
         public final Class<? extends Annotation> clazz;
@@ -529,7 +537,8 @@ public class GridResourceIoc {
             ResourceAnnotation.SPRING,
             ResourceAnnotation.IGNITE_INSTANCE,
             ResourceAnnotation.LOGGER,
-            ResourceAnnotation.SERVICE
+            ResourceAnnotation.SERVICE,
+            ResourceAnnotation.METRIC_MANAGER
         ),
 
         /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 461b8d9..ca51984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -73,6 +73,8 @@ public class GridResourceProcessor extends GridProcessorAdapter {
             new GridResourceLoggerInjector(ctx.config().getGridLogger());
         injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] =
             new GridResourceBasicInjector<>(ctx.grid());
+        injectorByAnnotation[GridResourceIoc.ResourceAnnotation.METRIC_MANAGER.ordinal()] =
+            new GridResourceSupplierInjector<>(ctx::metric);
     }
 
     /** {@inheritDoc} */
@@ -391,7 +393,7 @@ public class GridResourceProcessor extends GridProcessorAdapter {
         injectToJob(dep, taskCls, obj, ses, jobCtx);
 
         if (obj instanceof GridInternalWrapper) {
-            Object usrObj = ((GridInternalWrapper<?>)obj).userObject();
+            Object usrObj = ((GridInternalWrapper)obj).userObject();
 
             if (usrObj != null)
                 injectToJob(dep, taskCls, usrObj, ses, jobCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
new file mode 100644
index 0000000..21cddd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.resources;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.FIELD})
+public @interface MetricManagerResource {
+    // No-op.
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c2bb86b..102e5e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -35,7 +34,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -56,11 +54,9 @@ import org.apache.ignite.plugin.security.SecuritySubject;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
-import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 
 /**
@@ -665,7 +661,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
                     "'IgniteConfiguration.metricsUpdateFrequency' to prevent unnecessary status checking.");
         }
         // Intentionally compare references using '!=' below
-        else if (ignite.configuration().getFailureDetectionTimeout() != DFLT_FAILURE_DETECTION_TIMEOUT)
+        else if (ignite.configuration().getFailureDetectionTimeout() !=
+                IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT)
             log.warning("Failure detection timeout will be ignored (one of SPI parameters has been set explicitly)");
 
         clientFailureDetectionTimeout = ignite.configuration().getClientFailureDetectionTimeout();
@@ -841,7 +838,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
 
         /** {@inheritDoc} */
         @Override public Collection<ClusterNode> nodes() {
-            return locNode == null ? Collections.emptyList() : Collections.singletonList(locNode);
+            return locNode == null ? Collections.<ClusterNode>emptyList() : Collections.singletonList(locNode);
         }
 
         /** {@inheritDoc} */
@@ -946,7 +943,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
             if (!(ignite0 instanceof IgniteKernal))
                 throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
 
-            ((IgniteEx)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+            ((IgniteKernal)ignite0).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
@@ -956,7 +953,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
             if (!(ignite0 instanceof IgniteKernal))
                 throw new IgniteSpiException("Wrong Ignite instance is set: " + ignite0);
 
-            ((IgniteEx)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+            ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
 
         /** {@inheritDoc} */
@@ -973,25 +970,5 @@ public abstract class IgniteSpiAdapter implements IgniteSpi {
         @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) {
             throw new UnsupportedOperationException();
         }
-
-        /** {@inheritDoc} */
-        @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeMetricRegistry(String name) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
-            // No-op.
-        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 9c08aca..d4402f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Consumer;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -29,12 +28,10 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.security.SecuritySubject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -379,37 +376,4 @@ public interface IgniteSpiContext {
      * @param err Error.
      */
     public void resolveCommunicationFailure(ClusterNode node, Exception err);
-
-    /**
-     * Returns exisiting or newly created instance of metric registry with given name.
-     *
-     * @param name Metric registry name.
-     * @return Exisiting or newly created instance of metric registry.
-     */
-    @IgniteExperimental
-    public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name);
-
-    /**
-     * Removes metric registry with given name.
-     *
-     * @param name Metric registry name.
-     */
-    @IgniteExperimental
-    public void removeMetricRegistry(String name);
-
-    /**
-     * Returns all registered metric registries.
-     *
-     * @return All registered metric registries.
-     */
-    @IgniteExperimental
-    public Iterable<ReadOnlyMetricRegistry> metricRegistries();
-
-    /**
-     * Register listener which will be notified on metric registry creation.
-     *
-     * @param lsnr Listener.
-     */
-    @IgniteExperimental
-    public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index 86608fc..afece1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -27,15 +27,10 @@ import java.util.function.Function;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
-import org.apache.ignite.internal.util.collection.IntHashMap;
-import org.apache.ignite.internal.util.collection.IntMap;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.Metric;
 import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
@@ -65,15 +60,15 @@ import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_M
  * Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
  */
 class TcpCommunicationMetricsListener {
-    /** SPI context. */
-    private final IgniteSpiContext spiCtx;
+    /** Metrics manager. */
+    private final GridMetricManager mmgr;
+
+    /** Metrics registry. */
+    private final org.apache.ignite.internal.processors.metric.MetricRegistry mreg;
 
     /** Current ignite instance. */
     private final Ignite ignite;
 
-    /** Metrics registry. */
-    private final MetricRegistry mreg;
-
     /** All registered metrics. */
     private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -86,6 +81,12 @@ class TcpCommunicationMetricsListener {
         return metrics;
     });
 
+    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */
+    private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
+
+    /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */
+    private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
+
     /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code #sentMsgsMetricsByConsistentId}. */
     private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
 
@@ -104,37 +105,35 @@ class TcpCommunicationMetricsListener {
     /** Received messages count metric. */
     private final LongAdderMetric rcvdMsgsMetric;
 
-    /** Counters of sent and received messages by direct type. */
-    private final IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType;
-
     /** Method to synchronize access to message type map. */
-    private final Object msgTypeMapMux = new Object();
+    private final Object msgTypMapMux = new Object();
 
     /** Message type map. */
     private volatile Map<Short, String> msgTypeMap;
 
     /** */
-    public TcpCommunicationMetricsListener(Ignite ignite, IgniteSpiContext spiCtx) {
+    public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
+        this.mmgr = mmgr;
         this.ignite = ignite;
-        this.spiCtx = spiCtx;
-
-        mreg = (MetricRegistry)spiCtx.getOrCreateMetricRegistry(COMMUNICATION_METRICS_GROUP_NAME);
 
-        msgCntrsByType = createMessageCounters((IgniteMessageFactory)spiCtx.messageFactory());
+        mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME);
 
-        sentMsgsCntByConsistentIdMetricFactory = consistentId -> {
-            String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
+        sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
+            sentMessagesByTypeMetricName(directType),
+            SENT_MESSAGES_BY_TYPE_METRIC_DESC
+        );
+        rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric(
+            receivedMessagesByTypeMetricName(directType),
+            RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC
+        );
 
-            return spiCtx.getOrCreateMetricRegistry(name)
-                    .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
-        };
+        sentMsgsCntByConsistentIdMetricFactory = consistentId ->
+            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
+                .findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
 
-        rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> {
-            String name = metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString());
-
-            return spiCtx.getOrCreateMetricRegistry(name)
-                    .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
-        };
+        rcvdMsgsCntByConsistentIdMetricFactory = consistentId ->
+            mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()))
+                .findMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME);
 
         sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC);
         rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC);
@@ -142,49 +141,17 @@ class TcpCommunicationMetricsListener {
         sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC);
         rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC);
 
-        spiCtx.addMetricRegistryCreationListener(mreg -> {
+        mmgr.addMetricRegistryCreationListener(mreg -> {
             // Metrics for the specific nodes.
             if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR))
                 return;
 
-            ((MetricRegistry)mreg).longAdderMetric(
-                    SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
-                    SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
-            );
+            ((MetricRegistry)mreg).longAdderMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
 
-            ((MetricRegistry)mreg).longAdderMetric(
-                    RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME,
-                    RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC
-            );
+            ((MetricRegistry)mreg).longAdderMetric(RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_DESC);
         });
     }
 
-    /**
-     * Creates counters of sent and received messages by direct type.
-     *
-     * @param factory Message factory.
-     * @return Counters of sent and received messages grouped by direct type.
-     */
-    private IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> createMessageCounters(IgniteMessageFactory factory) {
-        IgniteMessageFactoryImpl msgFactory = (IgniteMessageFactoryImpl)factory;
-
-        short[] directTypes = msgFactory.registeredDirectTypes();
-
-        IntMap<IgniteBiTuple<LongAdderMetric, LongAdderMetric>> msgCntrsByType = new IntHashMap<>(directTypes.length);
-
-        for (short type : directTypes) {
-            LongAdderMetric sentCnt =
-                    mreg.longAdderMetric(sentMessagesByTypeMetricName(type), SENT_MESSAGES_BY_TYPE_METRIC_DESC);
-
-            LongAdderMetric rcvCnt =
-                    mreg.longAdderMetric(receivedMessagesByTypeMetricName(type), RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC);
-
-            msgCntrsByType.put(type, new IgniteBiTuple<>(sentCnt, rcvCnt));
-        }
-
-        return msgCntrsByType;
-    }
-
     /** Metrics registry. */
     public MetricRegistry metricRegistry() {
         return mreg;
@@ -318,10 +285,10 @@ class TcpCommunicationMetricsListener {
             if (metric.name().startsWith(prefix)) {
                 short directType = Short.parseShort(metric.name().substring(prefix.length()));
 
-                Map<Short, String> msgTypeMap0 = msgTypeMap;
+                Map<Short, String> msgTypMap0 = msgTypeMap;
 
-                if (msgTypeMap0 != null) {
-                    String typeName = msgTypeMap0.get(directType);
+                if (msgTypMap0 != null) {
+                    String typeName = msgTypMap0.get(directType);
 
                     if (typeName != null)
                         res.put(typeName, ((LongMetric)metric).value());
@@ -342,7 +309,7 @@ class TcpCommunicationMetricsListener {
 
         String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR;
 
-        for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
+        for (ReadOnlyMetricRegistry mreg : mmgr) {
             if (mreg.name().startsWith(mregPrefix)) {
                 String nodeConsIdStr = mreg.name().substring(mregPrefix.length());
 
@@ -375,7 +342,7 @@ class TcpCommunicationMetricsListener {
                 metric.reset();
         }
 
-        for (ReadOnlyMetricRegistry mreg : spiCtx.metricRegistries()) {
+        for (ReadOnlyMetricRegistry mreg : mmgr) {
             if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) {
                 mreg.findMetric(SENT_MESSAGES_BY_NODE_CONSISTENT_ID_METRIC_NAME).reset();
 
@@ -396,7 +363,7 @@ class TcpCommunicationMetricsListener {
             threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<>();
         }
 
-        spiCtx.removeMetricRegistry(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
+        mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
     }
 
     /**
@@ -407,24 +374,24 @@ class TcpCommunicationMetricsListener {
     private void updateMessageTypeMap(Message msg) {
         short typeId = msg.directType();
 
-        Map<Short, String> msgTypeMap0 = msgTypeMap;
+        Map<Short, String> msgTypMap0 = msgTypeMap;
 
-        if (msgTypeMap0 == null || !msgTypeMap0.containsKey(typeId)) {
-            synchronized (msgTypeMapMux) {
+        if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
+            synchronized (msgTypMapMux) {
                 if (msgTypeMap == null) {
-                    msgTypeMap0 = new HashMap<>();
+                    msgTypMap0 = new HashMap<>();
 
-                    msgTypeMap0.put(typeId, msg.getClass().getName());
+                    msgTypMap0.put(typeId, msg.getClass().getName());
 
-                    msgTypeMap = msgTypeMap0;
+                    msgTypeMap = msgTypMap0;
                 }
                 else {
                     if (!msgTypeMap.containsKey(typeId)) {
-                        msgTypeMap0 = new HashMap<>(msgTypeMap);
+                        msgTypMap0 = new HashMap<>(msgTypeMap);
 
-                        msgTypeMap0.put(typeId, msg.getClass().getName());
+                        msgTypMap0.put(typeId, msg.getClass().getName());
 
-                        msgTypeMap = msgTypeMap0;
+                        msgTypeMap = msgTypMap0;
                     }
                 }
             }
@@ -445,22 +412,29 @@ class TcpCommunicationMetricsListener {
      * Thread-local metrics.
      */
     private class ThreadMetrics {
-        /** Sent messages count metrics grouped by message node consistent id. */
-        volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
+        /** Sent messages count metrics grouped by message type. */
+        private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<>();
+
+        /** Received messages count metrics grouped by message type. */
+        private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<>();
 
-        /** Received messages metrics count grouped by message node consistent id. */
-        volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
+        /**
+         * Sent messages count metrics grouped by message node consistent id.
+         */
+        public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<>();
+
+        /**
+         * Received messages metrics count grouped by message node consistent id.
+         */
+        public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<>();
 
         /**
          * Collects statistics for message sent by SPI.
-         *
          * @param msg Sent message.
          * @param consistentId Receiver node consistent id.
          */
         private void onMessageSent(Message msg, Object consistentId) {
-            IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
-
-            cnts.get1().increment();
+            sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment();
 
             sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, sentMsgsCntByConsistentIdMetricFactory).increment();
         }
@@ -471,9 +445,7 @@ class TcpCommunicationMetricsListener {
          * @param consistentId Sender node consistent id.
          */
         private void onMessageReceived(Message msg, Object consistentId) {
-            IgniteBiTuple<LongAdderMetric, LongAdderMetric> cnts = msgCntrsByType.get(msg.directType());
-
-            cnts.get2().increment();
+            rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment();
 
             rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, rcvdMsgsCntByConsistentIdMetricFactory).increment();
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 59f22ae..862f149 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.NoopTracing;
 import org.apache.ignite.internal.processors.tracing.SpanTags;
 import org.apache.ignite.internal.processors.tracing.Tracing;
+import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -1422,6 +1424,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         }
     }
 
+    /** */
+    @MetricManagerResource
+    private void injectMetricManager(GridMetricManager mmgr) {
+        if (mmgr != null)
+            metricsLsnr = new TcpCommunicationMetricsListener(mmgr, ignite);
+    }
+
     /**
      * Sets local host address for socket binding. Note that one node could have
      * additional addresses beside the loopback one. This configuration
@@ -2085,37 +2094,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
     /** {@inheritDoc} */
     @Override public int getSentMessagesCount() {
-        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
-        if (metricsLsnr == null)
-            return 0;
-
         return metricsLsnr.sentMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getSentBytesCount() {
-        // Listener could be not initialized yet, but discovery thread clould try to aggregate metrics.
-        if (metricsLsnr == null)
-            return 0;
-
         return metricsLsnr.sentBytesCount();
     }
 
     /** {@inheritDoc} */
     @Override public int getReceivedMessagesCount() {
-        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
-        if (metricsLsnr == null)
-            return 0;
-
         return metricsLsnr.receivedMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getReceivedBytesCount() {
-        // Listener could be not initialized yet, but discovery thread could try to aggregate metrics.
-        if (metricsLsnr == null)
-            return 0;
-
         return metricsLsnr.receivedBytesCount();
     }
 
@@ -2504,8 +2497,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
 
         ctxInitLatch.countDown();
-
-        metricsLsnr = new TcpCommunicationMetricsListener(ignite, spiCtx);
     }
 
     /** {@inheritDoc} */
@@ -5323,7 +5314,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     /**
      *
      */
-    private static class ConnectGateway {
+    private class ConnectGateway {
         /** */
         private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
index 4ad4040..c55cc0d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -42,9 +41,6 @@ public class IgniteMessageFactoryImplTest {
     /** Test message 2 type. */
     private static final short TEST_MSG_2_TYPE = 2;
 
-    /** Test message 42 type. */
-    private static final short TEST_MSG_42_TYPE = 42;
-
     /** Unknown message type. */
     private static final short UNKNOWN_MSG_TYPE = 0;
 
@@ -67,7 +63,7 @@ public class IgniteMessageFactoryImplTest {
     public void testCreate() {
         MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
 
-        IgniteMessageFactoryImpl msgFactory = new IgniteMessageFactoryImpl(factories);
+        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
 
         Message msg;
 
@@ -77,12 +73,8 @@ public class IgniteMessageFactoryImplTest {
         msg = msgFactory.create(TEST_MSG_2_TYPE);
         assertTrue(msg instanceof TestMessage2);
 
-        msg = msgFactory.create(TEST_MSG_42_TYPE);
-        assertTrue(msg instanceof TestMessage42);
-
-        short[] directTypes = msgFactory.registeredDirectTypes();
-
-        assertArrayEquals(directTypes, new short[] {TEST_MSG_1_TYPE, TEST_MSG_2_TYPE, TEST_MSG_42_TYPE});
+        msg = msgFactory.create(TEST_MSG_2_TYPE);
+        assertTrue(msg instanceof TestMessage2);
     }
 
     /**
@@ -119,7 +111,6 @@ public class IgniteMessageFactoryImplTest {
         /** {@inheritDoc} */
         @Override public void registerAll(IgniteMessageFactory factory) {
             factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
-            factory.register(TEST_MSG_42_TYPE, TestMessage42::new);
         }
     }
 
@@ -163,7 +154,7 @@ public class IgniteMessageFactoryImplTest {
 
         /** {@inheritDoc} */
         @Override public short directType() {
-            return TEST_MSG_1_TYPE;
+            return 1;
         }
 
         /** {@inheritDoc} */
@@ -191,35 +182,7 @@ public class IgniteMessageFactoryImplTest {
 
         /** {@inheritDoc} */
         @Override public short directType() {
-            return TEST_MSG_2_TYPE;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-    }
-
-    /** Test message. */
-    private static class TestMessage42 implements Message {
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return TEST_MSG_42_TYPE;
+            return 2;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index a15d7a0..fdea2b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,7 +29,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.Consumer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -46,14 +45,12 @@ import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -65,7 +62,6 @@ import org.apache.ignite.spi.IgniteSpiContext;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
-import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -94,7 +90,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     private final Map<ClusterNode, Serializable> sentMsgs = new HashMap<>();
 
     /** */
-    private final ConcurrentMap<String, Map<?, ?>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Map> cache = new ConcurrentHashMap<>();
 
     /** */
     private MessageFormatter formatter;
@@ -113,7 +109,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     @Override public Collection<ClusterNode> remoteNodes() {
         return rmtNodes;
     }
@@ -616,33 +611,14 @@ public class GridSpiTestContext implements IgniteSpiContext {
         throw new UnsupportedOperationException();
     }
 
-    /** {@inheritDoc} */
-    @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) {
-        return new MetricRegistry(name, null, null, new NullLogger());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeMetricRegistry(String name) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
-        // No-op.
-    }
-
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.
      */
+    @SuppressWarnings("unchecked")
     private <K, V> Map<K, V> getOrCreateCache(String cacheName) {
         synchronized (cache) {
-            Map<K, V> map = (Map<K, V>)cache.get(cacheName);
+            Map<K, V> map = cache.get(cacheName);
 
             if (map == null)
                 cache.put(cacheName, map = new ConcurrentHashMap<>());
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
index f935e05e..a154fe5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.apache.ignite.internal.resources.MetricManagerResource;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.logger.NullLogger;
@@ -87,8 +88,10 @@ public class IgniteTestResources {
         return U.IGNITE_MBEANS_DISABLED ? null : ManagementFactory.getPlatformMBeanServer();
     }
 
-    /** */
-    public IgniteTestResources() {
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgniteTestResources() throws IgniteCheckedException {
         if (SensitiveInfoTestLoggerProxy.TEST_SENSITIVE)
             log = new SensitiveInfoTestLoggerProxy(rootLog.getLogger(getClass()), null, null, null);
         else
@@ -104,7 +107,7 @@ public class IgniteTestResources {
     /**
      * @param cfg Ignite configuration
      */
-    public IgniteTestResources(IgniteConfiguration cfg) {
+    public IgniteTestResources(IgniteConfiguration cfg) throws IgniteCheckedException {
         this.cfg = cfg;
         this.log = rootLog.getLogger(getClass());
         this.jmx = prepareMBeanServer();
@@ -115,7 +118,7 @@ public class IgniteTestResources {
     /**
      * @param jmx JMX server.
      */
-    public IgniteTestResources(MBeanServer jmx) {
+    public IgniteTestResources(MBeanServer jmx) throws IgniteCheckedException {
         assert jmx != null;
 
         this.jmx = jmx;
@@ -127,7 +130,7 @@ public class IgniteTestResources {
     /**
      * @param log Logger.
      */
-    public IgniteTestResources(IgniteLogger log) {
+    public IgniteTestResources(IgniteLogger log) throws IgniteCheckedException {
         assert log != null;
 
         this.log = log.getLogger(getClass());
@@ -164,7 +167,7 @@ public class IgniteTestResources {
      */
     public void startThreads(boolean prestart) {
         execSvc = new IgniteThreadPoolExecutor(nodeId.toString(), null, 40, 40, Long.MAX_VALUE,
-                new LinkedBlockingQueue<>());
+            new LinkedBlockingQueue<Runnable>());
 
         // Improve concurrency for testing.
         if (prestart)
@@ -191,6 +194,7 @@ public class IgniteTestResources {
         rsrcProc.injectBasicResource(target, LoggerResource.class, getLogger().getLogger(target.getClass()));
         rsrcProc.injectBasicResource(target, IgniteInstanceResource.class,
             new IgniteMock(null, locHost, nodeId, getMarshaller(), jmx, home, cfg));
+        rsrcProc.injectBasicResource(target, MetricManagerResource.class, ctx.metric());
     }
 
     /**


[ignite] 02/03: Revert "IGNITE-12682 IgniteMessageFactoryImpl.registerCustom() method is removed as potentially dangerous"

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 3d57f23a614595d246fc70c276ca8347ba868148
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Aug 31 19:02:59 2020 +0300

    Revert "IGNITE-12682 IgniteMessageFactoryImpl.registerCustom() method is removed as potentially dangerous"
    
    This reverts commit 2c428d53
---
 .../communication/IgniteMessageFactoryImpl.java    | 26 ++++++
 .../GridManagerLocalMessageListenerSelfTest.java   | 30 ++-----
 .../GridCommunicationSendMessageSelfTest.java      | 31 ++-----
 .../MessageDirectTypeIdConflictTest.java           | 96 ++++++++++++++++++++--
 .../GridCacheConditionalDeploymentSelfTest.java    | 32 ++------
 .../ignite/plugin/PluginConfigurationTest.java     | 92 ++++++++++++++++++++-
 .../GridAbstractCommunicationSelfTest.java         | 19 ++---
 .../communication/GridCacheMessageSelfTest.java    | 49 +++++------
 ...pCommunicationSpiConcurrentConnectSelfTest.java | 21 ++---
 ...idTcpCommunicationSpiMultithreadedSelfTest.java | 18 +---
 ...GridTcpCommunicationSpiRecoveryAckSelfTest.java | 21 ++---
 .../GridTcpCommunicationSpiRecoverySelfTest.java   | 21 ++---
 ...TcpCommunicationRecoveryAckClosureSelfTest.java | 21 ++---
 .../tcp/TcpCommunicationStatisticsTest.java        | 30 ++-----
 .../ignite/testframework/GridSpiTestContext.java   | 34 ++++----
 15 files changed, 309 insertions(+), 232 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 957ef7c..eb89043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.managers.communication;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
 
 import org.apache.ignite.IgniteException;
@@ -28,6 +30,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Message factory implementation which is responsible for instantiation of all communication messages.
@@ -39,6 +42,9 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
     /** Array size. */
     private static final int ARR_SIZE = 1 << Short.SIZE;
 
+    /** Custom messages registry. Used for test purposes. */
+    private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>();
+
     /** Message suppliers. */
     private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE);
 
@@ -115,6 +121,9 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
         Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];
 
         if (supplier == null)
+            supplier = CUSTOM.get(directType);
+
+        if (supplier == null)
             throw new IgniteException("Invalid message type: " + directType);
 
         return supplier.get();
@@ -137,4 +146,21 @@ public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
 
         return (short)res;
     }
+
+    /**
+     * Registers factory for custom message. Used for test purposes.
+     *
+     * @param type Message type.
+     * @param c Message producer.
+     *
+     * @deprecated Should be removed. Please don't use this method anymore.
+     * Consider using of plugin with own message types.
+     */
+    @TestOnly
+    @Deprecated
+    public static void registerCustom(short type, Supplier<Message> c) {
+        assert c != null;
+
+        CUSTOM.put(type, c);
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 9b20690..6dd103e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -23,13 +23,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiContext;
@@ -50,12 +45,14 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
     /** */
     private static final short DIRECT_TYPE = 210;
 
+    static {
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
 
-        c.setPluginProviders(new TestPluginProvider());
-
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
 
         c.setCommunicationSpi(commSpi);
@@ -210,21 +207,4 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
             });
         }
     }
-
-    /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "TEST_PLUGIN";
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(DIRECT_TYPE, GridIoUserMessage::new);
-                }
-            });
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index f7f929e..0a75cf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -21,13 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -51,12 +45,16 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
     /** */
     private static final short DIRECT_TYPE_OVER_BYTE = 1000;
 
+    static {
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new);
+
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
 
-        c.setPluginProviders(new TestPluginProvider());
-
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
 
         c.setCommunicationSpi(commSpi);
@@ -214,21 +212,4 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
         }
     }
 
-    /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "TEST_PLUGIN";
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(DIRECT_TYPE, TestMessage::new);
-                    factory.register(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
-                }
-            });
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
index e112915..6046f4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
@@ -17,13 +17,22 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
 import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
 import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
 import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -31,6 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
@@ -40,6 +50,9 @@ import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
  * for which message factory is already registered.
  */
 public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
+    /** Test plugin name. */
+    private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN";
+
     /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */
     private static final short MSG_DIRECT_TYPE = -44;
 
@@ -71,27 +84,99 @@ public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
      * for which message factory is already registered.
      */
     @Test
-    @SuppressWarnings({"RedundantThrows", "ThrowableNotThrown"})
     public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception {
         assertThrows(log, (Callable<Object>)this::startGrid, IgniteCheckedException.class,
                 "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE);
     }
 
+    /** Plugin with own message factory. */
+    private static class TestPlugin implements IgnitePlugin {
+    }
+
     /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
+    public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
         /** {@inheritDoc} */
         @Override public String name() {
-            return "TEST_PLUGIN";
+            return TEST_PLUGIN_NAME;
+        }
+
+        /** {@inheritDoc} */
+        @Override  public <T extends IgnitePlugin> T plugin() {
+            return (T)new TestPlugin();
         }
 
         /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String version() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String copyright() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
             registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
                 @Override public void registerAll(IgniteMessageFactory factory) {
                     factory.register(MSG_DIRECT_TYPE, TestMessage::new);
                 }
             });
         }
+
+        /** {@inheritDoc} */
+        @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop(boolean cancel) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStart() throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStop(boolean cancel) {
+            // no-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node, Serializable data) {
+            // No-op.
+        }
+    }
+
+    /** */
+    private static class TestPluginConfiguration implements PluginConfiguration {
     }
 
     /** Test message with already registered direct type. */
@@ -121,4 +206,5 @@ public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
             // No-op.
         }
     }
+
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index 97fa68c..aaa42b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,13 +22,8 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -41,14 +36,19 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  * Cache + conditional deployment test.
  */
 public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTest {
+    /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setCacheConfiguration(cacheConfiguration());
 
-        cfg.setPluginProviders(new TestPluginProvider());
-
         return cfg;
     }
 
@@ -201,22 +201,6 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
 
     /** */
     private static class TestValue {
-    }
 
-    /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "TEST_PLUGIN";
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(TestMessage.DIRECT_TYPE, TestMessage::new);
-                }
-            });
-        }
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/plugin/PluginConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/plugin/PluginConfigurationTest.java
index 87a9142..7f46a74 100644
--- a/modules/core/src/test/java/org/apache/ignite/plugin/PluginConfigurationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/plugin/PluginConfigurationTest.java
@@ -17,22 +17,30 @@
 
 package org.apache.ignite.plugin;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.ServiceLoader;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 /**
  * Tests for Ignite plugin configuration.
  */
 public class PluginConfigurationTest extends GridCommonAbstractTest {
+    /** Test plugin name. */
+    private static final String TEST_PLUGIN_NAME = "test_plugin";
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
@@ -98,11 +106,91 @@ public class PluginConfigurationTest extends GridCommonAbstractTest {
         return col.stream().map(PluginProvider::name).collect(Collectors.toList());
     }
 
+    /** Plugin with own message factory. */
+    private static class TestPlugin implements IgnitePlugin {
+    }
+
     /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
+    @SuppressWarnings("RedundantThrows")
+    public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
         /** {@inheritDoc} */
         @Override public String name() {
-            return "test_plugin";
+            return TEST_PLUGIN_NAME;
+        }
+
+        /** {@inheritDoc} */
+        @Override  public <T extends IgnitePlugin> T plugin() {
+            return (T)new TestPlugin();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String version() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String copyright() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("rawtypes")
+        @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+            // No-op.
         }
+
+        /** {@inheritDoc} */
+        @Override public void stop(boolean cancel) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStart() throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStop(boolean cancel) {
+            // no-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node, Serializable data) {
+            // No-op.
+        }
+    }
+
+    /** */
+    private static class TestPluginConfiguration implements PluginConfiguration {
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 3ed071c..326b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -29,16 +29,12 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
@@ -80,6 +76,13 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     /** */
     protected boolean useSsl;
 
+    /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
     /** */
     private class MessageListener implements CommunicationListener<Message> {
         /** */
@@ -303,14 +306,6 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory}));
-
             ctx.setLocalNode(node);
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index bfce2ad..21a846d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -35,16 +35,11 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -66,6 +61,21 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
     /** */
     public static final String TEST_BODY = "Test body";
 
+    /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+
+        IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new);
+
+        IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new);
+
+        IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -84,8 +94,6 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
         cfg.setCacheConfiguration(ccfg);
 
-        cfg.setPluginProviders(new TestPluginProvider());
-
         return cfg;
     }
 
@@ -797,7 +805,9 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         }
     }
 
-    /** */
+    /**
+     *
+     */
     private static class TestFailureHandler extends AbstractFailureHandler {
         /** {@inheritDoc} */
         @Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
@@ -806,25 +816,4 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
             return false;
         }
     }
-
-    /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "TEST_PLUGIN";
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(TestMessage.DIRECT_TYPE, TestMessage::new);
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                    factory.register(TestMessage1.DIRECT_TYPE, TestMessage1::new);
-                    factory.register(TestMessage2.DIRECT_TYPE, TestMessage2::new);
-                    factory.register(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
-                }
-            });
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index e0db859..69e4bef 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -37,7 +37,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -46,10 +45,7 @@ import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
@@ -101,6 +97,13 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     private boolean pairedConnections = true;
 
     /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
+    /**
      * Disable SPI auto-start.
      */
     public GridTcpCommunicationSpiConcurrentConnectSelfTest() {
@@ -432,16 +435,6 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory})
-            );
-
             ctx.setLocalNode(node);
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 60bb2f0..f99df2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -50,10 +49,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
@@ -102,6 +98,10 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /** Flag indicating if listener should reject messages. */
     private static boolean reject;
 
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
     /**
      * @param useShmem Use shared mem.
      */
@@ -491,16 +491,6 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory})
-            );
-
             ctx.timeoutProcessor(timeoutProcessor);
 
             ctx.setLocalNode(node);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 89e300a..d99f48f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -27,7 +27,6 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -37,10 +36,7 @@ import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.CommunicationListener;
@@ -76,6 +72,13 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
     private static final int SPI_CNT = 2;
 
     /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
+    /**
      * Disable SPI auto-start.
      */
     public GridTcpCommunicationSpiRecoveryAckSelfTest() {
@@ -398,16 +401,6 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory})
-            );
-
             ctx.setLocalNode(node);
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index cbaf4f0..1d03590 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -41,10 +40,7 @@ import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.CommunicationListener;
@@ -90,6 +86,13 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
     private static GridTimeoutProcessor timeoutProcessor;
 
     /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
+    /**
      * Disable SPI auto-start.
      */
     public GridTcpCommunicationSpiRecoverySelfTest() {
@@ -743,16 +746,6 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory})
-            );
-
             ctx.setLocalNode(node);
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index d6dd92a..ef9b413 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -40,10 +39,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.CommunicationListener;
@@ -80,6 +76,13 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     private static GridTimeoutProcessor timeoutProcessor;
 
     /**
+     *
+     */
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
+    /**
      * Disable SPI auto-start.
      */
     public IgniteTcpCommunicationRecoveryAckClosureSelfTest() {
@@ -451,16 +454,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             GridSpiTestContext ctx = initSpiContext();
 
-            MessageFactoryProvider testMsgFactory = new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            };
-
-            ctx.messageFactory(new IgniteMessageFactoryImpl(
-                    new MessageFactory[] {new GridIoMessageFactory(), testMsgFactory})
-            );
-
             ctx.setLocalNode(node);
 
             ctx.timeoutProcessor(timeoutProcessor);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 9a54271..26fb56b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -28,19 +28,14 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -60,6 +55,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
     /** */
     private final CountDownLatch latch = new CountDownLatch(1);
 
+    static {
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+    }
+
     /**
      * CommunicationSPI synchronized by {@code mux}.
      */
@@ -98,8 +97,6 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
 
         cfg.setCommunicationSpi(spi);
 
-        cfg.setPluginProviders(new TestPluginProvider());
-
         return cfg;
     }
 
@@ -208,21 +205,4 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
             stopAllGrids();
         }
     }
-
-    /** */
-    public static class TestPluginProvider extends AbstractTestPluginProvider {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return "TEST_PLUGIN";
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
-                }
-            });
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index fdea2b0..6424bfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -333,7 +334,13 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public void addLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
-        addMessageListener(TOPIC_COMM_USER, new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        try {
+            addMessageListener(TOPIC_COMM_USER,
+                new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /**
@@ -356,7 +363,13 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public void removeLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) {
-        removeMessageListener(TOPIC_COMM_USER, new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        try {
+            removeMessageListener(TOPIC_COMM_USER,
+                new GridLocalMessageListener(topic, (IgniteBiPredicate<UUID, Object>)p));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /**
@@ -389,7 +402,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public void addLocalEventListener(GridLocalEventListener lsnr, int... types) {
-        Set<Integer> typeSet = F.addIfAbsent(evtLsnrs, lsnr, F.newSet());
+        Set<Integer> typeSet = F.addIfAbsent(evtLsnrs, lsnr, F.<Integer>newSet());
 
         assert typeSet != null;
 
@@ -560,15 +573,6 @@ public class GridSpiTestContext implements IgniteSpiContext {
         return factory;
     }
 
-    /**
-     * Sets custom test message factory.
-     *
-     * @param factory Message factory.
-     */
-    public void messageFactory(MessageFactory factory) {
-        this.factory = factory;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean isStopping() {
         return false;
@@ -650,7 +654,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /**
      * This class represents a message listener wrapper that knows about peer deployment.
      */
-    private static class GridLocalMessageListener implements GridMessageListener {
+    private class GridLocalMessageListener implements GridMessageListener {
         /** Predicate listeners. */
         private final IgniteBiPredicate<UUID, Object> predLsnr;
 
@@ -660,8 +664,10 @@ public class GridSpiTestContext implements IgniteSpiContext {
         /**
          * @param topic User topic.
          * @param predLsnr Predicate listener.
+         * @throws IgniteCheckedException If failed to inject resources to predicates.
          */
-        GridLocalMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr) {
+        GridLocalMessageListener(@Nullable Object topic, @Nullable IgniteBiPredicate<UUID, Object> predLsnr)
+            throws IgniteCheckedException {
             this.topic = topic;
             this.predLsnr = predLsnr;
         }


[ignite] 03/03: Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type"

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit ed52559eb95c913e4b6ebc1b334f60c27ddbac26
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Aug 31 19:24:24 2020 +0300

    Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type"
    
    This reverts commit 65c30ec6
---
 .../managers/communication/GridIoManager.java      |    4 +-
 .../communication/GridIoMessageFactory.java        | 1127 ++++++++++++++++----
 .../communication/IgniteMessageFactoryImpl.java    |  166 ---
 .../communication/IgniteMessageFactory.java        |   39 -
 .../extensions/communication/MessageFactory.java   |    3 -
 .../communication/MessageFactoryProvider.java      |   46 -
 .../tcp/TcpCommunicationMetricsListener.java       |   16 +-
 .../org.apache.ignite.plugin.PluginProvider        |    1 -
 .../GridManagerLocalMessageListenerSelfTest.java   |   14 +-
 .../GridCommunicationSendMessageSelfTest.java      |   19 +-
 .../IgniteMessageFactoryImplTest.java              |  198 ----
 .../MessageDirectTypeIdConflictTest.java           |  210 ----
 .../GridCacheConditionalDeploymentSelfTest.java    |   22 +-
 ...niteCacheContinuousQueryImmutableEntryTest.java |    8 +-
 .../GridAbstractCommunicationSelfTest.java         |   17 +-
 .../communication/GridCacheMessageSelfTest.java    |   61 +-
 .../tcp/GridTcpCommunicationSpiAbstractTest.java   |   16 +-
 ...pCommunicationSpiConcurrentConnectSelfTest.java |   21 +-
 ...idTcpCommunicationSpiMultithreadedSelfTest.java |    9 +-
 ...GridTcpCommunicationSpiRecoveryAckSelfTest.java |   17 +-
 .../GridTcpCommunicationSpiRecoverySelfTest.java   |   13 +-
 ...TcpCommunicationRecoveryAckClosureSelfTest.java |   19 +-
 .../tcp/TcpCommunicationStatisticsTest.java        |   35 +-
 .../ignite/testframework/GridSpiTestContext.java   |    3 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java    |    5 -
 .../ignite/util/GridMessageCollectionTest.java     |    5 +-
 .../h2/twostep/msg/GridH2ValueMessageFactory.java  |  129 ++-
 27 files changed, 1220 insertions(+), 1003 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 552613f..28c5881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -517,8 +517,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         List<MessageFactory> compMsgs = new ArrayList<>();
 
-        compMsgs.add(new GridIoMessageFactory());
-
         for (IgniteComponentType compType : IgniteComponentType.values()) {
             MessageFactory f = compType.messageFactory();
 
@@ -529,7 +527,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (!compMsgs.isEmpty())
             msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
 
-        msgFactory = new IgniteMessageFactoryImpl(msgs);
+        msgFactory = new GridIoMessageFactory(msgs);
 
         if (log.isDebugEnabled())
             log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 84a84cd..69685ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridJobCancelRequest;
 import org.apache.ignite.internal.GridJobExecuteRequest;
 import org.apache.ignite.internal.GridJobExecuteResponse;
@@ -179,9 +182,9 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridMessageCollection;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -194,198 +197,940 @@ import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMess
 /**
  * Message factory implementation.
  */
-public class GridIoMessageFactory implements MessageFactoryProvider {
-    /** {@inheritDoc} */
-    @Override public void registerAll(IgniteMessageFactory factory) {
-        // -54 is reserved for SQL.
-        // -46 ... -51 - snapshot messages.
-        factory.register((short)-61, IgniteDiagnosticMessage::new);
-        factory.register((short)-53, SchemaOperationStatusMessage::new);
-        factory.register((short)-52, GridIntList::new);
-        factory.register((short)-51, NearCacheUpdates::new);
-        factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
-        factory.register((short)-49, UpdateErrors::new);
-        factory.register((short)-48, GridDhtAtomicNearResponse::new);
-        factory.register((short)-45, GridChangeGlobalStateMessageResponse::new);
-        factory.register((short)-44, HandshakeMessage2::new);
-        factory.register((short)-43, IgniteIoTestMessage::new);
-        factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
-        factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new);
-        factory.register((short)-26, TxLockList::new);
-        factory.register((short)-25, TxLock::new);
-        factory.register((short)-24, TxLocksRequest::new);
-        factory.register((short)-23, TxLocksResponse::new);
-        factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, NodeIdMessage::new);
-        factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, RecoveryLastReceivedMessage::new);
-        factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new);
-        factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new);
-        factory.register((short)0, GridJobCancelRequest::new);
-        factory.register((short)1, GridJobExecuteRequest::new);
-        factory.register((short)2, GridJobExecuteResponse::new);
-        factory.register((short)3, GridJobSiblingsRequest::new);
-        factory.register((short)4, GridJobSiblingsResponse::new);
-        factory.register((short)5, GridTaskCancelRequest::new);
-        factory.register((short)6, GridTaskSessionRequest::new);
-        factory.register((short)7, GridCheckpointRequest::new);
-        factory.register((short)8, GridIoMessage::new);
-        factory.register((short)9, GridIoUserMessage::new);
-        factory.register((short)10, GridDeploymentInfoBean::new);
-        factory.register((short)11, GridDeploymentRequest::new);
-        factory.register((short)12, GridDeploymentResponse::new);
-        factory.register((short)13, GridEventStorageMessage::new);
-        factory.register((short)16, GridCacheTxRecoveryRequest::new);
-        factory.register((short)17, GridCacheTxRecoveryResponse::new);
-        factory.register((short)20, GridCacheTtlUpdateRequest::new);
-        factory.register((short)21, GridDistributedLockRequest::new);
-        factory.register((short)22, GridDistributedLockResponse::new);
-        factory.register((short)23, GridDistributedTxFinishRequest::new);
-        factory.register((short)24, GridDistributedTxFinishResponse::new);
-        factory.register((short)25, GridDistributedTxPrepareRequest::new);
-        factory.register((short)26, GridDistributedTxPrepareResponse::new);
-        factory.register((short)27, GridDistributedUnlockRequest::new);
-        factory.register((short)28, GridDhtAffinityAssignmentRequest::new);
-        factory.register((short)29, GridDhtAffinityAssignmentResponse::new);
-        factory.register((short)30, GridDhtLockRequest::new);
-        factory.register((short)31, GridDhtLockResponse::new);
-        factory.register((short)32, GridDhtTxFinishRequest::new);
-        factory.register((short)33, GridDhtTxFinishResponse::new);
-        factory.register((short)34, GridDhtTxPrepareRequest::new);
-        factory.register((short)35, GridDhtTxPrepareResponse::new);
-        factory.register((short)36, GridDhtUnlockRequest::new);
-        factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
-        factory.register((short)38, GridDhtAtomicUpdateRequest::new);
-        factory.register((short)39, GridDhtAtomicUpdateResponse::new);
-        factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
-        factory.register((short)41, GridNearAtomicUpdateResponse::new);
-        factory.register((short)42, GridDhtForceKeysRequest::new);
-        factory.register((short)43, GridDhtForceKeysResponse::new);
-        factory.register((short)44, GridDhtPartitionDemandLegacyMessage::new);
-        factory.register((short)45, GridDhtPartitionDemandMessage::new);
-        factory.register((short)46, GridDhtPartitionsFullMessage::new);
-        factory.register((short)47, GridDhtPartitionsSingleMessage::new);
-        factory.register((short)48, GridDhtPartitionsSingleRequest::new);
-        factory.register((short)49, GridNearGetRequest::new);
-        factory.register((short)50, GridNearGetResponse::new);
-        factory.register((short)51, GridNearLockRequest::new);
-        factory.register((short)52, GridNearLockResponse::new);
-        factory.register((short)53, GridNearTxFinishRequest::new);
-        factory.register((short)54, GridNearTxFinishResponse::new);
-        factory.register((short)55, GridNearTxPrepareRequest::new);
-        factory.register((short)56, GridNearTxPrepareResponse::new);
-        factory.register((short)57, GridNearUnlockRequest::new);
-        factory.register((short)58, GridCacheQueryRequest::new);
-        factory.register((short)59, GridCacheQueryResponse::new);
-        factory.register((short)61, GridContinuousMessage::new);
-        factory.register((short)62, DataStreamerRequest::new);
-        factory.register((short)63, DataStreamerResponse::new);
-        factory.register((short)76, GridTaskResultRequest::new);
-        factory.register((short)77, GridTaskResultResponse::new);
-        factory.register((short)78, MissingMappingRequestMessage::new);
-        factory.register((short)79, MissingMappingResponseMessage::new);
-        factory.register((short)80, MetadataRequestMessage::new);
-        factory.register((short)81, MetadataResponseMessage::new);
-        factory.register((short)82, JobStealingRequest::new);
-        factory.register((short)84, GridByteArrayList::new);
-        factory.register((short)85, GridLongList::new);
-        factory.register((short)86, GridCacheVersion::new);
-        factory.register((short)87, GridDhtPartitionExchangeId::new);
-        factory.register((short)88, GridCacheReturn::new);
-        factory.register((short)89, CacheObjectImpl::new);
-        factory.register((short)90, KeyCacheObjectImpl::new);
-        factory.register((short)91, GridCacheEntryInfo::new);
-        factory.register((short)92, CacheEntryInfoCollection::new);
-        factory.register((short)93, CacheInvokeDirectResult::new);
-        factory.register((short)94, IgniteTxKey::new);
-        factory.register((short)95, DataStreamerEntry::new);
-        factory.register((short)96, CacheContinuousQueryEntry::new);
-        factory.register((short)97, CacheEvictionEntry::new);
-        factory.register((short)98, CacheEntryPredicateContainsValue::new);
-        factory.register((short)99, CacheEntrySerializablePredicate::new);
-        factory.register((short)100, IgniteTxEntry::new);
-        factory.register((short)101, TxEntryValueHolder::new);
-        factory.register((short)102, CacheVersionedValue::new);
-        factory.register((short)103, GridCacheRawVersionedEntry::new);
-        factory.register((short)104, GridCacheVersionEx::new);
-        factory.register((short)105, CacheObjectByteArrayImpl::new);
-        factory.register((short)106, GridQueryCancelRequest::new);
-        factory.register((short)107, GridQueryFailResponse::new);
-        factory.register((short)108, GridQueryNextPageRequest::new);
-        factory.register((short)109, GridQueryNextPageResponse::new);
-        factory.register((short)111, AffinityTopologyVersion::new);
-        factory.register((short)112, GridCacheSqlQuery::new);
-        factory.register((short)113, BinaryObjectImpl::new);
-        factory.register((short)114, GridDhtPartitionSupplyMessage::new);
-        factory.register((short)115, UUIDCollectionMessage::new);
-        factory.register((short)116, GridNearSingleGetRequest::new);
-        factory.register((short)117, GridNearSingleGetResponse::new);
-        factory.register((short)118, CacheContinuousQueryBatchAck::new);
-        factory.register((short)119, BinaryEnumObjectImpl::new);
-
-        // [120..123] - DR
-        factory.register((short)124, GridMessageCollection::new);
-        factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
-        factory.register((short)126, GridNearAtomicSingleUpdateInvokeRequest::new);
-        factory.register((short)127, GridNearAtomicSingleUpdateFilterRequest::new);
-        factory.register((short)128, CacheGroupAffinityMessage::new);
-        factory.register((short)129, WalStateAckMessage::new);
-        factory.register((short)130, UserManagementOperationFinishedMessage::new);
-        factory.register((short)131, UserAuthenticateRequestMessage::new);
-        factory.register((short)132, UserAuthenticateResponseMessage::new);
-        factory.register((short)133, ClusterMetricsUpdateMessage::new);
-        factory.register((short)134, ContinuousRoutineStartResultMessage::new);
-        factory.register((short)135, LatchAckMessage::new);
-        factory.register((short)136, MvccTxSnapshotRequest::new);
-        factory.register((short)137, MvccAckRequestTx::new);
-        factory.register((short)138, MvccFutureResponse::new);
-        factory.register((short)139, MvccQuerySnapshotRequest::new);
-        factory.register((short)140, MvccAckRequestQueryCntr::new);
-        factory.register((short)141, MvccSnapshotResponse::new);
-        factory.register((short)143, GridCacheMvccEntryInfo::new);
-        factory.register((short)144, GridDhtTxQueryEnlistResponse::new);
-        factory.register((short)145, MvccAckRequestQueryId::new);
-        factory.register((short)146, MvccAckRequestTxAndQueryCntr::new);
-        factory.register((short)147, MvccAckRequestTxAndQueryId::new);
-        factory.register((short)148, MvccVersionImpl::new);
-        factory.register((short)149, MvccActiveQueriesMessage::new);
-        factory.register((short)150, MvccSnapshotWithoutTxs::new);
-        factory.register((short)151, GridNearTxQueryEnlistRequest::new);
-        factory.register((short)152, GridNearTxQueryEnlistResponse::new);
-        factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new);
-        factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new);
-        factory.register((short)155, GridDhtTxQueryEnlistRequest::new);
-        factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new);
-        factory.register((short)157, PartitionUpdateCountersMessage::new);
-        factory.register((short)158, GridDhtPartitionSupplyMessageV2::new);
-        factory.register((short)159, GridNearTxEnlistRequest::new);
-        factory.register((short)160, GridNearTxEnlistResponse::new);
-        factory.register((short)161, GridInvokeValue::new);
-        factory.register((short)162, GenerateEncryptionKeyRequest::new);
-        factory.register((short)163, GenerateEncryptionKeyResponse::new);
-        factory.register((short)164, MvccRecoveryFinishedMessage::new);
-        factory.register((short)165, PartitionCountersNeighborcastRequest::new);
-        factory.register((short)166, PartitionCountersNeighborcastResponse::new);
-        factory.register((short)167, ServiceDeploymentProcessId::new);
-        factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new);
-        factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
-        factory.register((short)170, DeadlockProbe::new);
-        factory.register((short)171, ProbedTx::new);
-        factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new);
-        factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new);
-        factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new);
-        factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
-        factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
-        factory.register((short)177, TcpInverseConnectionResponseMessage::new);
-
-        // [-3..119] [124..129] [-23..-28] [-36..-55] - this
-        // [120..123] - DR
-        // [-4..-22, -30..-35] - SQL
-        // [2048..2053] - Snapshots
-        // [-42..-37] - former hadoop.
-        // [64..71] - former IGFS.
+public class GridIoMessageFactory implements MessageFactory {
+    /** Custom messages registry. Used for test purposes. */
+    private static final Map<Short, IgniteOutClosure<Message>> CUSTOM = new ConcurrentHashMap<>();
+
+    /** Extensions. */
+    private final MessageFactory[] ext;
+
+    /**
+     * @param ext Extensions.
+     */
+    public GridIoMessageFactory(MessageFactory[] ext) {
+        this.ext = ext;
     }
 
     /** {@inheritDoc} */
     @Override public Message create(short type) {
-        throw new UnsupportedOperationException();
+        Message msg = null;
+
+        switch (type) {
+            // -54 is reserved for SQL.
+            // -46 ... -51 - snapshot messages.
+            case -61:
+                msg = new IgniteDiagnosticMessage();
+
+                break;
+
+            case -53:
+                msg = new SchemaOperationStatusMessage();
+
+                break;
+
+            case -52:
+                msg = new GridIntList();
+
+                break;
+
+            case -51:
+                msg = new NearCacheUpdates();
+
+                break;
+
+            case -50:
+                msg = new GridNearAtomicCheckUpdateRequest();
+
+                break;
+
+            case -49:
+                msg = new UpdateErrors();
+
+                break;
+
+            case -48:
+                msg = new GridDhtAtomicNearResponse();
+
+                break;
+
+            case -45:
+                msg = new GridChangeGlobalStateMessageResponse();
+
+                break;
+
+            case -44:
+                msg = new HandshakeMessage2();
+
+                break;
+
+            case -43:
+                msg = new IgniteIoTestMessage();
+
+                break;
+
+            case -36:
+                msg = new GridDhtAtomicSingleUpdateRequest();
+
+                break;
+
+            case -27:
+                msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+                break;
+
+            case -26:
+                msg = new TxLockList();
+
+                break;
+
+            case -25:
+                msg = new TxLock();
+
+                break;
+
+            case -24:
+                msg = new TxLocksRequest();
+
+                break;
+
+            case -23:
+                msg = new TxLocksResponse();
+
+                break;
+
+            case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
+                msg = new NodeIdMessage();
+
+                break;
+
+            case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
+                msg = new RecoveryLastReceivedMessage();
+
+                break;
+
+            case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
+                msg = new HandshakeMessage();
+
+                break;
+
+            case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE:
+                msg = new HandshakeWaitMessage();
+
+                break;
+
+            case 0:
+                msg = new GridJobCancelRequest();
+
+                break;
+
+            case 1:
+                msg = new GridJobExecuteRequest();
+
+                break;
+
+            case 2:
+                msg = new GridJobExecuteResponse();
+
+                break;
+
+            case 3:
+                msg = new GridJobSiblingsRequest();
+
+                break;
+
+            case 4:
+                msg = new GridJobSiblingsResponse();
+
+                break;
+
+            case 5:
+                msg = new GridTaskCancelRequest();
+
+                break;
+
+            case 6:
+                msg = new GridTaskSessionRequest();
+
+                break;
+
+            case 7:
+                msg = new GridCheckpointRequest();
+
+                break;
+
+            case 8:
+                msg = new GridIoMessage();
+
+                break;
+
+            case 9:
+                msg = new GridIoUserMessage();
+
+                break;
+
+            case 10:
+                msg = new GridDeploymentInfoBean();
+
+                break;
+
+            case 11:
+                msg = new GridDeploymentRequest();
+
+                break;
+
+            case 12:
+                msg = new GridDeploymentResponse();
+
+                break;
+
+            case 13:
+                msg = new GridEventStorageMessage();
+
+                break;
+
+            case 16:
+                msg = new GridCacheTxRecoveryRequest();
+
+                break;
+
+            case 17:
+                msg = new GridCacheTxRecoveryResponse();
+
+                break;
+
+            case 20:
+                msg = new GridCacheTtlUpdateRequest();
+
+                break;
+
+            case 21:
+                msg = new GridDistributedLockRequest();
+
+                break;
+
+            case 22:
+                msg = new GridDistributedLockResponse();
+
+                break;
+
+            case 23:
+                msg = new GridDistributedTxFinishRequest();
+
+                break;
+
+            case 24:
+                msg = new GridDistributedTxFinishResponse();
+
+                break;
+
+            case 25:
+                msg = new GridDistributedTxPrepareRequest();
+
+                break;
+
+            case 26:
+                msg = new GridDistributedTxPrepareResponse();
+
+                break;
+
+            case 27:
+                msg = new GridDistributedUnlockRequest();
+
+                break;
+
+            case 28:
+                msg = new GridDhtAffinityAssignmentRequest();
+
+                break;
+
+            case 29:
+                msg = new GridDhtAffinityAssignmentResponse();
+
+                break;
+
+            case 30:
+                msg = new GridDhtLockRequest();
+
+                break;
+
+            case 31:
+                msg = new GridDhtLockResponse();
+
+                break;
+
+            case 32:
+                msg = new GridDhtTxFinishRequest();
+
+                break;
+
+            case 33:
+                msg = new GridDhtTxFinishResponse();
+
+                break;
+
+            case 34:
+                msg = new GridDhtTxPrepareRequest();
+
+                break;
+
+            case 35:
+                msg = new GridDhtTxPrepareResponse();
+
+                break;
+
+            case 36:
+                msg = new GridDhtUnlockRequest();
+
+                break;
+
+            case 37:
+                msg = new GridDhtAtomicDeferredUpdateResponse();
+
+                break;
+
+            case 38:
+                msg = new GridDhtAtomicUpdateRequest();
+
+                break;
+
+            case 39:
+                msg = new GridDhtAtomicUpdateResponse();
+
+                break;
+
+            case 40:
+                msg = new GridNearAtomicFullUpdateRequest();
+
+                break;
+
+            case 41:
+                msg = new GridNearAtomicUpdateResponse();
+
+                break;
+
+            case 42:
+                msg = new GridDhtForceKeysRequest();
+
+                break;
+
+            case 43:
+                msg = new GridDhtForceKeysResponse();
+
+                break;
+
+            case 44:
+                msg = new GridDhtPartitionDemandLegacyMessage();
+
+                break;
+
+            case 45:
+                msg = new GridDhtPartitionDemandMessage();
+
+                break;
+
+            case 46:
+                msg = new GridDhtPartitionsFullMessage();
+
+                break;
+
+            case 47:
+                msg = new GridDhtPartitionsSingleMessage();
+
+                break;
+
+            case 48:
+                msg = new GridDhtPartitionsSingleRequest();
+
+                break;
+
+            case 49:
+                msg = new GridNearGetRequest();
+
+                break;
+
+            case 50:
+                msg = new GridNearGetResponse();
+
+                break;
+
+            case 51:
+                msg = new GridNearLockRequest();
+
+                break;
+
+            case 52:
+                msg = new GridNearLockResponse();
+
+                break;
+
+            case 53:
+                msg = new GridNearTxFinishRequest();
+
+                break;
+
+            case 54:
+                msg = new GridNearTxFinishResponse();
+
+                break;
+
+            case 55:
+                msg = new GridNearTxPrepareRequest();
+
+                break;
+
+            case 56:
+                msg = new GridNearTxPrepareResponse();
+
+                break;
+
+            case 57:
+                msg = new GridNearUnlockRequest();
+
+                break;
+
+            case 58:
+                msg = new GridCacheQueryRequest();
+
+                break;
+
+            case 59:
+                msg = new GridCacheQueryResponse();
+
+                break;
+
+            case 61:
+                msg = new GridContinuousMessage();
+
+                break;
+
+            case 62:
+                msg = new DataStreamerRequest();
+
+                break;
+
+            case 63:
+                msg = new DataStreamerResponse();
+
+                break;
+
+            case 76:
+                msg = new GridTaskResultRequest();
+
+                break;
+
+            case 77:
+                msg = new GridTaskResultResponse();
+
+                break;
+
+            case 78:
+                msg = new MissingMappingRequestMessage();
+
+                break;
+
+            case 79:
+                msg = new MissingMappingResponseMessage();
+
+                break;
+
+            case 80:
+                msg = new MetadataRequestMessage();
+
+                break;
+
+            case 81:
+                msg = new MetadataResponseMessage();
+
+                break;
+
+            case 82:
+                msg = new JobStealingRequest();
+
+                break;
+
+            case 84:
+                msg = new GridByteArrayList();
+
+                break;
+
+            case 85:
+                msg = new GridLongList();
+
+                break;
+
+            case 86:
+                msg = new GridCacheVersion();
+
+                break;
+
+            case 87:
+                msg = new GridDhtPartitionExchangeId();
+
+                break;
+
+            case 88:
+                msg = new GridCacheReturn();
+
+                break;
+
+            case 89:
+                msg = new CacheObjectImpl();
+
+                break;
+
+            case 90:
+                msg = new KeyCacheObjectImpl();
+
+                break;
+
+            case 91:
+                msg = new GridCacheEntryInfo();
+
+                break;
+
+            case 92:
+                msg = new CacheEntryInfoCollection();
+
+                break;
+
+            case 93:
+                msg = new CacheInvokeDirectResult();
+
+                break;
+
+            case 94:
+                msg = new IgniteTxKey();
+
+                break;
+
+            case 95:
+                msg = new DataStreamerEntry();
+
+                break;
+
+            case 96:
+                msg = new CacheContinuousQueryEntry();
+
+                break;
+
+            case 97:
+                msg = new CacheEvictionEntry();
+
+                break;
+
+            case 98:
+                msg = new CacheEntryPredicateContainsValue();
+
+                break;
+
+            case 99:
+                msg = new CacheEntrySerializablePredicate();
+
+                break;
+
+            case 100:
+                msg = new IgniteTxEntry();
+
+                break;
+
+            case 101:
+                msg = new TxEntryValueHolder();
+
+                break;
+
+            case 102:
+                msg = new CacheVersionedValue();
+
+                break;
+
+            case 103:
+                msg = new GridCacheRawVersionedEntry<>();
+
+                break;
+
+            case 104:
+                msg = new GridCacheVersionEx();
+
+                break;
+
+            case 105:
+                msg = new CacheObjectByteArrayImpl();
+
+                break;
+
+            case 106:
+                msg = new GridQueryCancelRequest();
+
+                break;
+
+            case 107:
+                msg = new GridQueryFailResponse();
+
+                break;
+
+            case 108:
+                msg = new GridQueryNextPageRequest();
+
+                break;
+
+            case 109:
+                msg = new GridQueryNextPageResponse();
+
+                break;
+
+            case 110:
+                // EMPTY type
+                // GridQueryRequest was removed
+                break;
+
+            case 111:
+                msg = new AffinityTopologyVersion();
+
+                break;
+
+            case 112:
+                msg = new GridCacheSqlQuery();
+
+                break;
+
+            case 113:
+                msg = new BinaryObjectImpl();
+
+                break;
+
+            case 114:
+                msg = new GridDhtPartitionSupplyMessage();
+
+                break;
+
+            case 115:
+                msg = new UUIDCollectionMessage();
+
+                break;
+
+            case 116:
+                msg = new GridNearSingleGetRequest();
+
+                break;
+
+            case 117:
+                msg = new GridNearSingleGetResponse();
+
+                break;
+
+            case 118:
+                msg = new CacheContinuousQueryBatchAck();
+
+                break;
+
+            case 119:
+                msg = new BinaryEnumObjectImpl();
+
+                break;
+
+            // [120..123] - DR
+            case 124:
+                msg = new GridMessageCollection<>();
+
+                break;
+
+            case 125:
+                msg = new GridNearAtomicSingleUpdateRequest();
+
+                break;
+
+            case 126:
+                msg = new GridNearAtomicSingleUpdateInvokeRequest();
+
+                break;
+
+            case 127:
+                msg = new GridNearAtomicSingleUpdateFilterRequest();
+
+                break;
+
+            case 128:
+                msg = new CacheGroupAffinityMessage();
+
+                break;
+
+            case 129:
+                msg = new WalStateAckMessage();
+
+                break;
+
+            case 130:
+                msg = new UserManagementOperationFinishedMessage();
+
+                break;
+
+            case 131:
+                msg = new UserAuthenticateRequestMessage();
+
+                break;
+
+            case 132:
+                msg = new UserAuthenticateResponseMessage();
+
+                break;
+
+            case 133:
+                msg = new ClusterMetricsUpdateMessage();
+
+                break;
+
+            case 134:
+                msg = new ContinuousRoutineStartResultMessage();
+
+                break;
+
+            case 135:
+                msg = new LatchAckMessage();
+
+                break;
+
+            case 136:
+                msg = new MvccTxSnapshotRequest();
+
+                break;
+
+            case 137:
+                msg = new MvccAckRequestTx();
+
+                break;
+
+            case 138:
+                msg = new MvccFutureResponse();
+
+                break;
+
+            case 139:
+                msg = new MvccQuerySnapshotRequest();
+
+                break;
+
+            case 140:
+                msg = new MvccAckRequestQueryCntr();
+
+                break;
+
+            case 141:
+                msg = new MvccSnapshotResponse();
+
+                break;
+
+            case 143:
+                msg = new GridCacheMvccEntryInfo();
+
+                break;
+
+            case 144:
+                msg = new GridDhtTxQueryEnlistResponse();
+
+                break;
+
+            case 145:
+                msg = new MvccAckRequestQueryId();
+
+                break;
+
+            case 146:
+                msg = new MvccAckRequestTxAndQueryCntr();
+
+                break;
+
+            case 147:
+                msg = new MvccAckRequestTxAndQueryId();
+
+                break;
+
+            case 148:
+                msg = new MvccVersionImpl();
+
+                break;
+
+            case 149:
+                msg = new MvccActiveQueriesMessage();
+
+                break;
+
+            case 150:
+                msg = new MvccSnapshotWithoutTxs();
+
+                break;
+
+            case 151:
+                msg = new GridNearTxQueryEnlistRequest();
+
+                break;
+
+            case 152:
+                msg = new GridNearTxQueryEnlistResponse();
+
+                break;
+
+            case 153:
+                msg = new GridNearTxQueryResultsEnlistRequest();
+
+                break;
+
+            case 154:
+                msg = new GridNearTxQueryResultsEnlistResponse();
+
+                break;
+
+            case 155:
+                msg = new GridDhtTxQueryEnlistRequest();
+
+                break;
+
+            case 156:
+                msg = new GridDhtTxQueryFirstEnlistRequest();
+
+                break;
+
+            case 157:
+                msg = new PartitionUpdateCountersMessage();
+
+                break;
+
+            case 158:
+                msg = new GridDhtPartitionSupplyMessageV2();
+
+                break;
+
+            case 159:
+                msg = new GridNearTxEnlistRequest();
+
+                break;
+
+            case 160:
+                msg = new GridNearTxEnlistResponse();
+
+                break;
+
+            case 161:
+                msg = new GridInvokeValue();
+
+                break;
+
+            case 162:
+                msg = new GenerateEncryptionKeyRequest();
+
+                break;
+
+            case 163:
+                msg = new GenerateEncryptionKeyResponse();
+
+                break;
+
+            case 164:
+                msg = new MvccRecoveryFinishedMessage();
+
+                break;
+
+            case 165:
+                msg = new PartitionCountersNeighborcastRequest();
+
+                break;
+
+            case 166:
+                msg = new PartitionCountersNeighborcastResponse();
+
+                break;
+
+            case 167:
+                msg = new ServiceDeploymentProcessId();
+
+                break;
+
+            case 168:
+                msg = new ServiceSingleNodeDeploymentResultBatch();
+
+                break;
+
+            case 169:
+                msg = new ServiceSingleNodeDeploymentResult();
+
+                break;
+
+            case 170:
+                msg = new DeadlockProbe();
+
+                break;
+
+            case 171:
+                msg = new ProbedTx();
+
+                break;
+
+            case GridQueryKillRequest.TYPE_CODE:
+                msg = new GridQueryKillRequest();
+
+                break;
+
+            case GridQueryKillResponse.TYPE_CODE:
+                msg = new GridQueryKillResponse();
+
+                break;
+
+            case GridIoSecurityAwareMessage.TYPE_CODE:
+                msg = new GridIoSecurityAwareMessage();
+
+                break;
+
+            case SessionChannelMessage.TYPE_CODE:
+                msg = new SessionChannelMessage();
+
+                break;
+
+            case SingleNodeMessage.TYPE_CODE:
+                msg = new SingleNodeMessage<>();
+
+                break;
+
+            case 177:
+                msg = new TcpInverseConnectionResponseMessage();
+
+                break;
+
+            // [-3..119] [124..129] [-23..-28] [-36..-55] - this
+            // [120..123] - DR
+            // [-4..-22, -30..-35] - SQL
+            // [2048..2053] - Snapshots
+            default:
+                if (ext != null) {
+                    for (MessageFactory factory : ext) {
+                        msg = factory.create(type);
+
+                        if (msg != null)
+                            break;
+                    }
+                }
+
+                if (msg == null) {
+                    IgniteOutClosure<Message> c = CUSTOM.get(type);
+
+                    if (c != null)
+                        msg = c.apply();
+                }
+        }
+
+        if (msg == null)
+            throw new IgniteException("Invalid message type: " + type);
+
+        return msg;
+    }
+
+    /**
+     * Registers factory for custom message. Used for test purposes.
+     *
+     * @param type Message type.
+     * @param c Message producer.
+     */
+    public static void registerCustom(short type, IgniteOutClosure<Message> c) {
+        assert c != null;
+
+        CUSTOM.put(type, c);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
deleted file mode 100644
index eb89043..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
-
-/**
- * Message factory implementation which is responsible for instantiation of all communication messages.
- */
-public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
-    /** Offset. */
-    private static final int OFF = -Short.MIN_VALUE;
-
-    /** Array size. */
-    private static final int ARR_SIZE = 1 << Short.SIZE;
-
-    /** Custom messages registry. Used for test purposes. */
-    private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>();
-
-    /** Message suppliers. */
-    private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE);
-
-    /** Initialized flag. If {@code true} then new message type couldn't be registered. */
-    private boolean initialized;
-
-    /**
-     * Contructor.
-     *
-     * @param factories Concrete message factories or message factory providers. Cfn't be empty or {@code null}.
-     */
-    public IgniteMessageFactoryImpl(MessageFactory[] factories) {
-        if (factories == null || factories.length == 0)
-            throw new IllegalArgumentException("Message factory couldn't be initialized. Factories aren't provided.");
-
-        List<MessageFactory> old = new ArrayList<>(factories.length);
-
-        for (MessageFactory factory : factories) {
-            if (factory instanceof MessageFactoryProvider) {
-                MessageFactoryProvider p = (MessageFactoryProvider)factory;
-
-                p.registerAll(this);
-            }
-            else
-                old.add(factory);
-        }
-
-        if (!old.isEmpty()) {
-            for (int i = 0; i < ARR_SIZE; i++) {
-                Supplier<Message> curr = msgSuppliers[i];
-
-                if (curr == null) {
-                    short directType = indexToDirectType(i);
-
-                    for (MessageFactory factory : old) {
-                        Message msg = factory.create(directType);
-
-                        if (msg != null)
-                            register(directType, () -> factory.create(directType));
-                    }
-                }
-            }
-        }
-
-        initialized = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
-        if (initialized) {
-            throw new IllegalStateException("Message factory is already initialized. " +
-                    "Registration of new message types is forbidden.");
-        }
-
-        int idx = directTypeToIndex(directType);
-
-        Supplier<Message> curr = msgSuppliers[idx];
-
-        if (curr == null)
-            msgSuppliers[idx] = supplier;
-        else
-            throw new IgniteException("Message factory is already registered for direct type: " + directType);
-    }
-
-    /**
-     * Creates new message instance of provided direct type.
-     * <p>
-     *
-     * @param directType Message direct type.
-     * @return Message instance.
-     * @throws IgniteException If there are no any message factory for given {@code directType}.
-     */
-    @Override public @Nullable Message create(short directType) {
-        Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];
-
-        if (supplier == null)
-            supplier = CUSTOM.get(directType);
-
-        if (supplier == null)
-            throw new IgniteException("Invalid message type: " + directType);
-
-        return supplier.get();
-    }
-
-    /**
-     * @param directType Direct type.
-     */
-    private static int directTypeToIndex(short directType) {
-        return directType + OFF;
-    }
-
-    /**
-     * @param idx Index.
-     */
-    private static short indexToDirectType(int idx) {
-        int res = idx - OFF;
-
-        assert res >= Short.MIN_VALUE && res <= Short.MAX_VALUE;
-
-        return (short)res;
-    }
-
-    /**
-     * Registers factory for custom message. Used for test purposes.
-     *
-     * @param type Message type.
-     * @param c Message producer.
-     *
-     * @deprecated Should be removed. Please don't use this method anymore.
-     * Consider using of plugin with own message types.
-     */
-    @TestOnly
-    @Deprecated
-    public static void registerCustom(short type, Supplier<Message> c) {
-        assert c != null;
-
-        CUSTOM.put(type, c);
-    }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
deleted file mode 100644
index ae159b3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import java.util.function.Supplier;
-import org.apache.ignite.IgniteException;
-
-/**
- * Message factory for all communication messages registered using {@link #register(short, Supplier)} method call.
- */
-public interface IgniteMessageFactory extends MessageFactory {
-    /**
-     * Register message factory with given direct type. All messages must be registered during construction
-     * of class which implements this interface. Any invocation of this method after initialization is done must
-     * throw {@link IllegalStateException} exception.
-     *
-     * @param directType Direct type.
-     * @param supplier Message factory.
-     * @throws IgniteException In case of attempt to register message with direct type which is already registered.
-     * @throws IllegalStateException On any invocation of this method when class which implements this interface
-     * is alredy constructed.
-     */
-    public void register(short directType, Supplier<Message> supplier) throws IgniteException;
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
index 07a3d5b..1ea88fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -26,10 +26,7 @@ import org.jetbrains.annotations.Nullable;
  * A plugin can provide his own message factory as an extension
  * if it uses any custom messages (all message must extend
  * {@link Message} class).
- *
- * @deprecated Use {@link MessageFactoryProvider} instead.
  */
-@Deprecated
 public interface MessageFactory extends Extension {
     /**
      * Creates new message instance of provided type.
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
deleted file mode 100644
index 910c68c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Provider of communication message factories.
- * <p>
- * Implementation of this interface is responsible for registration of all message factories in
- * {@link #registerAll} method.
- * <p>
- * {@link #registerAll} method's call is responsibility of {@link IgniteMessageFactory} implementation.
- */
-public interface MessageFactoryProvider extends MessageFactory {
-    /**
-     * Registers all messages factories. See {@link IgniteMessageFactory#register}.
-     *
-     * @param factory {@link IgniteMessageFactory} implementation.
-     */
-    public void registerAll(IgniteMessageFactory factory);
-
-    /**
-     * Always throws {@link UnsupportedOperationException}.
-     * @param type Message direct type.
-     * @throws UnsupportedOperationException On any invocation.
-     */
-    @Override @Nullable public default Message create(short type) {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index afece1f..361409b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -109,7 +109,7 @@ class TcpCommunicationMetricsListener {
     private final Object msgTypMapMux = new Object();
 
     /** Message type map. */
-    private volatile Map<Short, String> msgTypeMap;
+    private volatile Map<Short, String> msgTypMap;
 
     /** */
     public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
@@ -285,7 +285,7 @@ class TcpCommunicationMetricsListener {
             if (metric.name().startsWith(prefix)) {
                 short directType = Short.parseShort(metric.name().substring(prefix.length()));
 
-                Map<Short, String> msgTypMap0 = msgTypeMap;
+                Map<Short, String> msgTypMap0 = msgTypMap;
 
                 if (msgTypMap0 != null) {
                     String typeName = msgTypMap0.get(directType);
@@ -374,24 +374,24 @@ class TcpCommunicationMetricsListener {
     private void updateMessageTypeMap(Message msg) {
         short typeId = msg.directType();
 
-        Map<Short, String> msgTypMap0 = msgTypeMap;
+        Map<Short, String> msgTypMap0 = msgTypMap;
 
         if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
             synchronized (msgTypMapMux) {
-                if (msgTypeMap == null) {
+                if (msgTypMap == null) {
                     msgTypMap0 = new HashMap<>();
 
                     msgTypMap0.put(typeId, msg.getClass().getName());
 
-                    msgTypeMap = msgTypMap0;
+                    msgTypMap = msgTypMap0;
                 }
                 else {
-                    if (!msgTypeMap.containsKey(typeId)) {
-                        msgTypMap0 = new HashMap<>(msgTypeMap);
+                    if (!msgTypMap.containsKey(typeId)) {
+                        msgTypMap0 = new HashMap<>(msgTypMap);
 
                         msgTypMap0.put(typeId, msg.getClass().getName());
 
-                        msgTypeMap = msgTypMap0;
+                        msgTypMap = msgTypMap0;
                     }
                 }
             }
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
index 7704c0b..6bca88f 100644
--- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -2,4 +2,3 @@ org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteSta
 org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider
 org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin
 org.apache.ignite.plugin.NodeValidationPluginProvider
-
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 6dd103e..108e4672 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -22,9 +22,11 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiContext;
@@ -46,7 +48,11 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
     private static final short DIRECT_TYPE = 210;
 
     static {
-        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new);
+        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridIoUserMessage();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -174,7 +180,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
         private IgniteSpiContext spiCtx;
 
         /** Test message topic. **/
-        private static final String TEST_TOPIC = "test_topic";
+        private String TEST_TOPIC = "test_topic";
 
         /** {@inheritDoc} */
         @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
@@ -186,7 +192,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
             // No-op.
         }
 
-        /** {@inheritDoc} */
         @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
             this.spiCtx = spiCtx;
 
@@ -198,7 +203,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         }
 
-        /** {@inheritDoc} */
         @Override public void onContextDestroyed0() {
             spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
                 @Override public boolean apply(UUID uuid, Object o) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 0a75cf5..8a27a46 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -45,10 +46,20 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
     /** */
     private static final short DIRECT_TYPE_OVER_BYTE = 1000;
 
-    static {
-        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new);
+    /** */
+    private int bufSize;
 
-        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
+    static {
+        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestMessage();
+            }
+        });
+        GridIoMessageFactory.registerCustom(DIRECT_TYPE_OVER_BYTE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestOverByteIdMessage();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -97,6 +108,8 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
      */
     @Test
     public void testSendMessageWithBuffer() throws Exception {
+        bufSize = 8192;
+
         try {
             startGridsMultiThreaded(2);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
deleted file mode 100644
index c55cc0d..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for default implementation of {@link IgniteMessageFactory} interface.
- */
-public class IgniteMessageFactoryImplTest {
-    /** Test message 1 type. */
-    private static final short TEST_MSG_1_TYPE = 1;
-
-    /** Test message 2 type. */
-    private static final short TEST_MSG_2_TYPE = 2;
-
-    /** Unknown message type. */
-    private static final short UNKNOWN_MSG_TYPE = 0;
-
-    /**
-     * Tests that impossible register new message after initialization.
-     */
-    @Test(expected = IllegalStateException.class)
-    public void testReadOnly() {
-        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
-        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
-        msgFactory.register((short)0, () -> null);
-    }
-
-    /**
-     * Tests that proper message type will be returned by message factory.
-     */
-    @Test
-    public void testCreate() {
-        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
-        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
-        Message msg;
-
-        msg = msgFactory.create(TEST_MSG_1_TYPE);
-        assertTrue(msg instanceof TestMessage1);
-
-        msg = msgFactory.create(TEST_MSG_2_TYPE);
-        assertTrue(msg instanceof TestMessage2);
-
-        msg = msgFactory.create(TEST_MSG_2_TYPE);
-        assertTrue(msg instanceof TestMessage2);
-    }
-
-    /**
-     * Tests that exception will be thrown for unknown message direct type.
-     */
-    @Test(expected = IgniteException.class)
-    public void testCreate_UnknownMessageType() {
-        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
-        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
-        msgFactory.create(UNKNOWN_MSG_TYPE);
-    }
-
-    /**
-     * Tests attemption of registration message with already registered message type.
-     */
-    @Test(expected = IgniteException.class)
-    @SuppressWarnings("ResultOfObjectAllocationIgnored")
-    public void testRegisterTheSameType() {
-        MessageFactory[] factories = {
-                new TestMessageFactoryPovider(),
-                new TestMessageFactory(),
-                new TestMessageFactoryPoviderWithTheSameDirectType()
-        };
-
-        new IgniteMessageFactoryImpl(factories);
-    }
-
-    /**
-     * {@link MessageFactoryProvider} implementation.
-     */
-    private static class TestMessageFactoryPovider implements MessageFactoryProvider {
-        /** {@inheritDoc} */
-        @Override public void registerAll(IgniteMessageFactory factory) {
-            factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
-        }
-    }
-
-    /**
-     * {@link MessageFactoryProvider} implementation with message direct type which is already registered.
-     */
-    private static class TestMessageFactoryPoviderWithTheSameDirectType implements MessageFactoryProvider {
-        /** {@inheritDoc} */
-        @Override public void registerAll(IgniteMessageFactory factory) {
-            factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
-        }
-    }
-
-    /**
-     * {@link MessageFactory} implementation whish still uses creation with switch-case.
-     */
-    private static class TestMessageFactory implements MessageFactory {
-        /** {@inheritDoc} */
-        @Override public @Nullable Message create(short type) {
-            switch (type) {
-                case TEST_MSG_2_TYPE:
-                    return new TestMessage2();
-
-                default:
-                    return null;
-            }
-        }
-    }
-
-    /** Test message. */
-    private static class TestMessage1 implements Message {
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return 1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-    }
-
-    /** Test message. */
-    private static class TestMessage2 implements Message {
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return 2;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-    }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
deleted file mode 100644
index 6046f4a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-import java.util.UUID;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.plugin.CachePluginContext;
-import org.apache.ignite.plugin.CachePluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.IgnitePlugin;
-import org.apache.ignite.plugin.PluginConfiguration;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.PluginProvider;
-import org.apache.ignite.plugin.PluginValidationException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
-
-/**
- * Tests that node will not start if some component tries to register message factory with direct type
- * for which message factory is already registered.
- */
-public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
-    /** Test plugin name. */
-    private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN";
-
-    /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */
-    private static final short MSG_DIRECT_TYPE = -44;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
-        cfg.setPluginProviders(new TestPluginProvider());
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-    }
-
-    /**
-     * Tests that node will not start if some component tries to register message factory with direct type
-     * for which message factory is already registered.
-     */
-    @Test
-    public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception {
-        assertThrows(log, (Callable<Object>)this::startGrid, IgniteCheckedException.class,
-                "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE);
-    }
-
-    /** Plugin with own message factory. */
-    private static class TestPlugin implements IgnitePlugin {
-    }
-
-    /** */
-    public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
-        /** {@inheritDoc} */
-        @Override public String name() {
-            return TEST_PLUGIN_NAME;
-        }
-
-        /** {@inheritDoc} */
-        @Override  public <T extends IgnitePlugin> T plugin() {
-            return (T)new TestPlugin();
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String version() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String copyright() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
-            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
-                @Override public void registerAll(IgniteMessageFactory factory) {
-                    factory.register(MSG_DIRECT_TYPE, TestMessage::new);
-                }
-            });
-        }
-
-        /** {@inheritDoc} */
-        @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void start(PluginContext ctx) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void stop(boolean cancel) throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onIgniteStart() throws IgniteCheckedException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onIgniteStop(boolean cancel) {
-            // no-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void validateNewNode(ClusterNode node, Serializable data) {
-            // No-op.
-        }
-    }
-
-    /** */
-    private static class TestPluginConfiguration implements PluginConfiguration {
-    }
-
-    /** Test message with already registered direct type. */
-    private static class TestMessage implements Message {
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return MSG_DIRECT_TYPE;
-        }
-
-        /** {@inheritDoc} */
-        @Override public byte fieldsCount() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-    }
-
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index aaa42b1..12490d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,8 +22,10 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -40,7 +42,11 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+        GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestMessage();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -56,8 +62,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      * @return Cache configuration.
      * @throws Exception In case of error.
      */
-    protected CacheConfiguration<?, ?> cacheConfiguration() throws Exception {
-        CacheConfiguration<?, ?> cfg = defaultCacheConfiguration();
+    protected CacheConfiguration cacheConfiguration() throws Exception {
+        CacheConfiguration cfg = defaultCacheConfiguration();
 
         cfg.setCacheMode(PARTITIONED);
         cfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -107,7 +113,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      */
     @Test
     public void testAddedDeploymentInfo() throws Exception {
-        GridCacheContext<?, ?> ctx = cacheContext();
+        GridCacheContext ctx = cacheContext();
 
         if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
             assertFalse(ctx.deploymentEnabled());
@@ -131,7 +137,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      */
     @Test
     public void testAddedDeploymentInfo2() throws Exception {
-        GridCacheContext<?, ?> ctx = cacheContext();
+        GridCacheContext ctx = cacheContext();
 
         if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
             assertFalse(ctx.deploymentEnabled());
@@ -155,8 +161,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
     /**
      * @return Cache context.
      */
-    protected GridCacheContext<?, ?> cacheContext() {
-        return ((IgniteCacheProxy<?, ?>)grid(0).cache(DEFAULT_CACHE_NAME)).context();
+    protected GridCacheContext cacheContext() {
+        return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context();
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index 49d588f..aeaabda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -33,11 +33,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -62,7 +60,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setAtomicityMode(atomicityMode());
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -155,9 +153,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
         e0.writeTo(buf, writer);
 
         CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
-        IgniteMessageFactoryImpl msgFactory =
-                new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
-        e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(msgFactory, (byte)1));
+        e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1));
 
         assertEquals(e0.cacheId(), e1.cacheId());
         assertEquals(e0.eventType(), e1.eventType());
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 326b716..8034093 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -29,8 +29,9 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -51,7 +52,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
  * Super class for all communication self tests.
  * @param <T> Type of communication SPI.
  */
-public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
     /** */
     private static long msgId = 1;
 
@@ -74,13 +75,17 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     private static GridTimeoutProcessor timeoutProcessor;
 
     /** */
-    protected boolean useSsl;
+    protected boolean useSsl = false;
 
     /**
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /** */
@@ -157,7 +162,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             for (ClusterNode node : nodes) {
                 synchronized (mux) {
                     if (!msgDestMap.containsKey(entry.getKey()))
-                        msgDestMap.put(entry.getKey(), new HashSet<>());
+                        msgDestMap.put(entry.getKey(), new HashSet<UUID>());
 
                     msgDestMap.get(entry.getKey()).add(node.id());
                 }
@@ -203,7 +208,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             for (ClusterNode node : nodes) {
                 synchronized (mux) {
                     if (!msgDestMap.containsKey(sndId))
-                        msgDestMap.put(sndId, new HashSet<>());
+                        msgDestMap.put(sndId, new HashSet<UUID>());
 
                     msgDestMap.get(sndId).add(node.id());
                 }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 21a846d..797328e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -31,13 +31,14 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -65,15 +66,35 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+        GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestMessage();
+            }
+        });
 
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
 
-        IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new);
+        GridIoMessageFactory.registerCustom(TestMessage1.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestMessage1();
+            }
+        });
 
-        IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new);
+        GridIoMessageFactory.registerCustom(TestMessage2.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestMessage2();
+            }
+        });
 
-        IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
+        GridIoMessageFactory.registerCustom(TestBadMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new TestBadMessage();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -84,7 +105,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
         cfg.setFailureHandler(new TestFailureHandler());
 
-        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(CacheMode.PARTITIONED);
         ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
@@ -125,25 +146,25 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         try {
             startGrids(2);
 
-            IgniteEx ignite0 = grid(0);
-            IgniteEx ignite1 = grid(1);
+            Ignite ignite0 = grid(0);
+            Ignite ignite1 = grid(1);
 
-            ignite0.context().cache().context().io().addCacheHandler(
+            ((IgniteKernal)ignite0).context().cache().context().io().addCacheHandler(
                 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
                 @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                     throw new RuntimeException("Test bad message exception");
                 }
             });
 
-            ignite1.context().cache().context().io().addCacheHandler(
+            ((IgniteKernal)ignite1).context().cache().context().io().addCacheHandler(
                 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
                     @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                         throw new RuntimeException("Test bad message exception");
                     }
                 });
 
-            ignite0.context().cache().context().io().send(
-                ignite1.localNode().id(), new TestBadMessage(), (byte)2);
+            ((IgniteKernal)ignite0).context().cache().context().io().send(
+                ((IgniteKernal)ignite1).localNode().id(), new TestBadMessage(), (byte)2);
 
             boolean res = failureLatch.await(5, TimeUnit.SECONDS);
 
@@ -158,8 +179,8 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void doSend() throws Exception {
-        GridIoManager mgr0 = grid(0).context().io();
-        GridIoManager mgr1 = grid(1).context().io();
+        GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
+        GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
 
         String topic = "test-topic";
 
@@ -174,14 +195,14 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
                     assertEquals(10, messages.size());
 
-                    int cnt = 0;
+                    int count = 0;
 
                     for (TestMessage1 msg1 : messages) {
                         assertTrue(msg1.body().contains(TEST_BODY));
 
                         int i = Integer.parseInt(msg1.body().substring(TEST_BODY.length() + 1));
 
-                        assertEquals(cnt, i);
+                        assertEquals(count, i);
 
                         TestMessage2 msg2 = (TestMessage2) msg1.message();
 
@@ -193,11 +214,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
                         GridTestMessage msg3 = (GridTestMessage) msg2.message();
 
-                        assertEquals(cnt, msg3.getMsgId());
+                        assertEquals(count, msg3.getMsgId());
 
                         assertEquals(grid(1).localNode().id(), msg3.getSourceNodeId());
 
-                        cnt++;
+                        count++;
                     }
                 }
                 catch (Exception e) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54e5386..7940a63 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
 /**
  * Test for {@link TcpCommunicationSpi}
  */
-abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> {
+abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
     /** */
     private static final int SPI_COUNT = 3;
 
@@ -59,7 +59,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     }
 
     /** {@inheritDoc} */
-    @Override protected CommunicationSpi<Message> getSpi(int idx) {
+    @Override protected CommunicationSpi getSpi(int idx) {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
         if (!useShmem)
@@ -88,7 +88,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.testSendToManyNodes();
 
         // Test idle clients remove.
-        for (CommunicationSpi<Message> spi : spis.values()) {
+        for (CommunicationSpi spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             assertEquals(getSpiCount() - 1, clients.size());
@@ -129,13 +129,13 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
 
         final CyclicBarrier b = new CyclicBarrier(THREADS);
 
-        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+        List<IgniteInternalFuture> futs = new ArrayList<>();
 
         for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
             final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
 
-            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
+            futs.add(GridTestUtils.runAsync(new Callable() {
+                @Override public Object call() throws Exception {
                     List<ClusterNode> checkNodes = new ArrayList<>(nodes);
 
                     assert checkNodes.size() > 1;
@@ -156,7 +156,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
             }));
         }
 
-        for (IgniteInternalFuture<?> f : futs)
+        for (IgniteInternalFuture f : futs)
             f.get();
     }
 
@@ -164,7 +164,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        for (CommunicationSpi<Message> spi : spis.values()) {
+        for (CommunicationSpi spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
 
             for (int i = 0; i < 20; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 69e4bef..7ab0d6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -37,12 +37,13 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -64,7 +65,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi<Message>>
+public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi>
     extends GridSpiAbstractTest<T> {
     /** */
     private static final int SPI_CNT = 2;
@@ -100,7 +101,11 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
@@ -351,24 +356,24 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
-                    for (CommunicationSpi<?> spi : spis) {
+                    for (CommunicationSpi spi : spis) {
                         ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
                         assertEquals(1, clients.size());
 
-                        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+                        final GridNioServer srv = U.field(spi, "nioSrvr");
 
                         final int conns = pairedConnections ? 2 : 1;
 
                         GridTestUtils.waitForCondition(new GridAbsPredicate() {
                             @Override public boolean apply() {
-                                Collection<?> sessions = U.field(srv, "sessions");
+                                Collection sessions = U.field(srv, "sessions");
 
                                 return sessions.size() == conns * connectionsPerNode;
                             }
                         }, 5000);
 
-                        Collection<?> sessions = U.field(srv, "sessions");
+                        Collection sessions = U.field(srv, "sessions");
 
                         assertEquals(conns * connectionsPerNode, sessions.size());
                     }
@@ -391,7 +396,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /**
      * @return SPI.
      */
-    private CommunicationSpi<Message> createSpi() {
+    private CommunicationSpi createSpi() {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
         spi.setLocalAddress("127.0.0.1");
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index f99df2b..a53b43b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -38,13 +38,14 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,7 +100,11 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     private static boolean reject;
 
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index d99f48f..408eb10 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -27,13 +27,14 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -55,7 +56,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
@@ -75,7 +76,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
@@ -168,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                 final long totAcked0 = totAcked;
 
                 for (TcpCommunicationSpi spi : spis) {
-                    GridNioServer<?> srv = U.field(spi, "nioSrvr");
+                    GridNioServer srv = U.field(spi, "nioSrvr");
 
                     final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 
@@ -276,7 +281,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
         ClusterNode node0 = nodes.get(0);
         ClusterNode node1 = nodes.get(1);
 
-        final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
+        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
 
         int msgId = 0;
 
@@ -336,7 +341,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
-        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+        final GridNioServer srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 1d03590..5ec734a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,12 +32,13 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -60,7 +61,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
@@ -89,7 +90,11 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
@@ -669,7 +674,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi, boolean in) throws Exception {
-        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+        final GridNioServer srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index ef9b413..d937bb0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -58,7 +59,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi<Message>>
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
     extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -79,7 +80,11 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      *
      */
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
@@ -90,7 +95,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     }
 
     /** */
-    private static class TestListener implements CommunicationListener<Message> {
+    private class TestListener implements CommunicationListener<Message> {
         /** */
         private GridConcurrentHashSet<Long> msgIds = new GridConcurrentHashSet<>();
 
@@ -189,7 +194,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                 final long totAcked0 = totAcked;
 
                 for (TcpCommunicationSpi spi : spis) {
-                    GridNioServer<?> srv = U.field(spi, "nioSrvr");
+                    GridNioServer srv = U.field(spi, "nioSrvr");
 
                     Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 
@@ -301,7 +306,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
         // Check that session will not be closed by idle timeout because expected close by queue overflow.
         assertTrue(spi0.getIdleConnectionTimeout() > awaitTime);
 
-        final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
+        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
 
         // For prevent session close by write timeout.
         srv1.writeTimeout(60_000);
@@ -387,7 +392,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
-        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+        final GridNioServer srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 26fb56b..99840c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -17,21 +17,28 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -56,7 +63,11 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
     private final CountDownLatch latch = new CountDownLatch(1);
 
     static {
-        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
     }
 
     /**
@@ -106,9 +117,19 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
      * @param nodeIdx Node index.
      * @return MBean instance.
      */
-    private TcpCommunicationSpiMBean mbean(int nodeIdx) {
-        return getMxBean(getTestIgniteInstanceName(nodeIdx), "SPIs",
-            SynchronizedCommunicationSpi.class, TcpCommunicationSpiMBean.class);
+    private TcpCommunicationSpiMBean mbean(int nodeIdx) throws MalformedObjectNameException {
+        ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
+            SynchronizedCommunicationSpi.class.getSimpleName());
+
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+        if (mbeanServer.isRegistered(mbeanName))
+            return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class,
+                true);
+        else
+            fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+        return null;
     }
 
     /**
@@ -138,10 +159,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
 
             latch.await(10, TimeUnit.SECONDS);
 
-            ClusterGroup clusterGrpNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+            ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
 
             // Send job from node0 to node1.
-            grid(0).compute(clusterGrpNode1).call(new IgniteCallable<Boolean>() {
+            grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
                 @Override public Boolean call() throws Exception {
                     return Boolean.TRUE;
                 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 6424bfc..da43ded 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -568,7 +567,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
         if (factory == null)
-            factory = new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+            factory = new GridIoMessageFactory(null);
 
         return factory;
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5876bee..3374928 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -47,9 +47,7 @@ import org.apache.ignite.internal.managers.communication.IgniteCommunicationBala
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationSslBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImplTest;
 import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
-import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConflictTest;
 import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
@@ -353,9 +351,6 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationBalanceMultipleConnectionsTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationSslBalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
-        GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
-        GridTestUtils.addTestIfNeeded(suite, IgniteMessageFactoryImplTest.class, ignoredTests);
-        GridTestUtils.addTestIfNeeded(suite, MessageDirectTypeIdConflictTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index 0720b4a..ce04839 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -21,9 +21,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -124,8 +122,7 @@ public class GridMessageCollectionTest {
 
         assertEquals(m.directType(), type);
 
-        IgniteMessageFactory msgFactory =
-                new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+        GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
 
         Message mx = msgFactory.create(type);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index e4aae7d..0ff53f7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,52 +23,107 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * H2 Value message factory.
  */
-public class GridH2ValueMessageFactory implements MessageFactoryProvider {
+public class GridH2ValueMessageFactory implements MessageFactory {
     /** {@inheritDoc} */
-    @Override public void registerAll(IgniteMessageFactory factory) {
-        factory.register((short)-4, () -> GridH2Null.INSTANCE);
-        factory.register((short)-5, GridH2Boolean::new);
-        factory.register((short)-6, GridH2Byte::new);
-        factory.register((short)-7, GridH2Short::new);
-        factory.register((short)-8, GridH2Integer::new);
-        factory.register((short)-9, GridH2Long::new);
-        factory.register((short)-10, GridH2Decimal::new);
-        factory.register((short)-11, GridH2Double::new);
-        factory.register((short)-12, GridH2Float::new);
-        factory.register((short)-13, GridH2Time::new);
-        factory.register((short)-14, GridH2Date::new);
-        factory.register((short)-15, GridH2Timestamp::new);
-        factory.register((short)-16, GridH2Bytes::new);
-        factory.register((short)-17, GridH2String::new);
-        factory.register((short)-18, GridH2Array::new);
-        factory.register((short)-19, GridH2JavaObject::new);
-        factory.register((short)-20, GridH2Uuid::new);
-        factory.register((short)-21, GridH2Geometry::new);
-        factory.register((short)-22, GridH2CacheObject::new);
-        factory.register((short)-30, GridH2IndexRangeRequest::new);
-        factory.register((short)-31, GridH2IndexRangeResponse::new);
-        factory.register((short)-32, GridH2RowMessage::new);
-        factory.register((short)-33, GridH2QueryRequest::new);
-        factory.register((short)-34, GridH2RowRange::new);
-        factory.register((short)-35, GridH2RowRangeBounds::new);
-        factory.register((short)-54, QueryTable::new);
-        factory.register((short)-55, GridH2DmlRequest::new);
-        factory.register((short)-56, GridH2DmlResponse::new);
-        factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
-    }
+    @Nullable @Override public Message create(short type) {
+        switch (type) {
+            case -4:
+                return GridH2Null.INSTANCE;
 
-    /** {@inheritDoc} */
-    @Override @Nullable public Message create(short type) {
-        throw new UnsupportedOperationException();
+            case -5:
+                return new GridH2Boolean();
+
+            case -6:
+                return new GridH2Byte();
+
+            case -7:
+                return new GridH2Short();
+
+            case -8:
+                return new GridH2Integer();
+
+            case -9:
+                return new GridH2Long();
+
+            case -10:
+                return new GridH2Decimal();
+
+            case -11:
+                return new GridH2Double();
+
+            case -12:
+                return new GridH2Float();
+
+            case -13:
+                return new GridH2Time();
+
+            case -14:
+                return new GridH2Date();
+
+            case -15:
+                return new GridH2Timestamp();
+
+            case -16:
+                return new GridH2Bytes();
+
+            case -17:
+                return new GridH2String();
+
+            case -18:
+                return new GridH2Array();
+
+            case -19:
+                return new GridH2JavaObject();
+
+            case -20:
+                return new GridH2Uuid();
+
+            case -21:
+                return new GridH2Geometry();
+
+            case -22:
+                return new GridH2CacheObject();
+
+            case -30:
+                return new GridH2IndexRangeRequest();
+
+            case -31:
+                return new GridH2IndexRangeResponse();
+
+            case -32:
+                return new GridH2RowMessage();
+
+            case -33:
+                return new GridH2QueryRequest();
+
+            case -34:
+                return new GridH2RowRange();
+
+            case -35:
+                return new GridH2RowRangeBounds();
+
+            case -54:
+                return new QueryTable();
+
+            case -55:
+                return new GridH2DmlRequest();
+
+            case -56:
+                return new GridH2DmlResponse();
+
+            case -57:
+                return new GridH2SelectForUpdateTxDetails();
+        }
+
+        return null;
     }
 
     /**