You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/02/04 15:15:59 UTC

[ignite] branch master updated: IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 65c30ec  IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type
65c30ec is described below

commit 65c30ec6947d35f18b620a7fc9fd0f6d5774317c
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Mon Jan 27 17:49:32 2020 +0300

    IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type
---
 .../managers/communication/GridIoManager.java      |    4 +-
 .../communication/GridIoMessageFactory.java        | 1203 ++++----------------
 .../communication/IgniteMessageFactoryImpl.java    |  166 +++
 .../communication/IgniteMessageFactory.java        |   39 +
 .../extensions/communication/MessageFactory.java   |    3 +
 ...ageFactory.java => MessageFactoryProvider.java} |   31 +-
 .../tcp/TcpCommunicationMetricsListener.java       |   16 +-
 .../org.apache.ignite.plugin.PluginProvider        |    1 +
 .../GridManagerLocalMessageListenerSelfTest.java   |   14 +-
 .../GridCommunicationSendMessageSelfTest.java      |   19 +-
 .../IgniteMessageFactoryImplTest.java              |  199 ++++
 .../MessageDirectTypeIdConflictTest.java           |  209 ++++
 .../GridCacheConditionalDeploymentSelfTest.java    |   22 +-
 ...niteCacheContinuousQueryImmutableEntryTest.java |    8 +-
 .../GridAbstractCommunicationSelfTest.java         |   17 +-
 .../communication/GridCacheMessageSelfTest.java    |   61 +-
 .../tcp/GridTcpCommunicationSpiAbstractTest.java   |   16 +-
 ...pCommunicationSpiConcurrentConnectSelfTest.java |   21 +-
 ...idTcpCommunicationSpiMultithreadedSelfTest.java |    9 +-
 ...GridTcpCommunicationSpiRecoveryAckSelfTest.java |   17 +-
 .../GridTcpCommunicationSpiRecoverySelfTest.java   |   13 +-
 ...TcpCommunicationRecoveryAckClosureSelfTest.java |   19 +-
 .../tcp/TcpCommunicationStatisticsTest.java        |   19 +-
 .../ignite/testframework/GridSpiTestContext.java   |    3 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java    |    5 +
 .../ignite/util/GridMessageCollectionTest.java     |    5 +-
 .../h2/twostep/msg/GridH2ValueMessageFactory.java  |  129 +--
 27 files changed, 986 insertions(+), 1282 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9821edf..e8aeb53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -498,6 +498,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         List<MessageFactory> compMsgs = new ArrayList<>();
 
+        compMsgs.add(new GridIoMessageFactory());
+
         for (IgniteComponentType compType : IgniteComponentType.values()) {
             MessageFactory f = compType.messageFactory();
 
@@ -508,7 +510,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (!compMsgs.isEmpty())
             msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
 
-        msgFactory = new GridIoMessageFactory(msgs);
+        msgFactory = new IgniteMessageFactoryImpl(msgs);
 
         if (log.isDebugEnabled())
             log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e82bb26..0d7976e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.managers.communication;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridJobCancelRequest;
 import org.apache.ignite.internal.GridJobExecuteRequest;
 import org.apache.ignite.internal.GridJobExecuteResponse;
@@ -196,9 +193,9 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridMessageCollection;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
@@ -210,1005 +207,209 @@ import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMess
 /**
  * Message factory implementation.
  */
-public class GridIoMessageFactory implements MessageFactory {
-    /** Custom messages registry. Used for test purposes. */
-    private static final Map<Short, IgniteOutClosure<Message>> CUSTOM = new ConcurrentHashMap<>();
-
-    /** Extensions. */
-    private final MessageFactory[] ext;
-
-    /**
-     * @param ext Extensions.
-     */
-    public GridIoMessageFactory(MessageFactory[] ext) {
-        this.ext = ext;
+public class GridIoMessageFactory implements MessageFactoryProvider {
+    /** {@inheritDoc} */
+    @Override public void registerAll(IgniteMessageFactory factory) {
+        // -54 is reserved for SQL.
+        // -46 ... -51 - snapshot messages.
+        factory.register((short)-61, IgniteDiagnosticMessage::new);
+        factory.register((short)-53, SchemaOperationStatusMessage::new);
+        factory.register((short)-52, GridIntList::new);
+        factory.register((short)-51, NearCacheUpdates::new);
+        factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
+        factory.register((short)-49, UpdateErrors::new);
+        factory.register((short)-48, GridDhtAtomicNearResponse::new);
+        factory.register((short)-45, GridChangeGlobalStateMessageResponse::new);
+        factory.register((short)-44, HandshakeMessage2::new);
+        factory.register((short)-43, IgniteIoTestMessage::new);
+        factory.register((short)-42, HadoopDirectShuffleMessage::new);
+        factory.register((short)-41, HadoopShuffleFinishResponse::new);
+        factory.register((short)-40, HadoopShuffleFinishRequest::new);
+        factory.register((short)-39, HadoopJobId::new);
+        factory.register((short)-38, HadoopShuffleAck::new);
+        factory.register((short)-37, HadoopShuffleMessage::new);
+        factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
+        factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new);
+        factory.register((short)-26, TxLockList::new);
+        factory.register((short)-25, TxLock::new);
+        factory.register((short)-24, TxLocksRequest::new);
+        factory.register((short)-23, TxLocksResponse::new);
+        factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, NodeIdMessage::new);
+        factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, RecoveryLastReceivedMessage::new);
+        factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new);
+        factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new);
+        factory.register((short)0, GridJobCancelRequest::new);
+        factory.register((short)1, GridJobExecuteRequest::new);
+        factory.register((short)2, GridJobExecuteResponse::new);
+        factory.register((short)3, GridJobSiblingsRequest::new);
+        factory.register((short)4, GridJobSiblingsResponse::new);
+        factory.register((short)5, GridTaskCancelRequest::new);
+        factory.register((short)6, GridTaskSessionRequest::new);
+        factory.register((short)7, GridCheckpointRequest::new);
+        factory.register((short)8, GridIoMessage::new);
+        factory.register((short)9, GridIoUserMessage::new);
+        factory.register((short)10, GridDeploymentInfoBean::new);
+        factory.register((short)11, GridDeploymentRequest::new);
+        factory.register((short)12, GridDeploymentResponse::new);
+        factory.register((short)13, GridEventStorageMessage::new);
+        factory.register((short)16, GridCacheTxRecoveryRequest::new);
+        factory.register((short)17, GridCacheTxRecoveryResponse::new);
+        factory.register((short)20, GridCacheTtlUpdateRequest::new);
+        factory.register((short)21, GridDistributedLockRequest::new);
+        factory.register((short)22, GridDistributedLockResponse::new);
+        factory.register((short)23, GridDistributedTxFinishRequest::new);
+        factory.register((short)24, GridDistributedTxFinishResponse::new);
+        factory.register((short)25, GridDistributedTxPrepareRequest::new);
+        factory.register((short)26, GridDistributedTxPrepareResponse::new);
+        factory.register((short)27, GridDistributedUnlockRequest::new);
+        factory.register((short)28, GridDhtAffinityAssignmentRequest::new);
+        factory.register((short)29, GridDhtAffinityAssignmentResponse::new);
+        factory.register((short)30, GridDhtLockRequest::new);
+        factory.register((short)31, GridDhtLockResponse::new);
+        factory.register((short)32, GridDhtTxFinishRequest::new);
+        factory.register((short)33, GridDhtTxFinishResponse::new);
+        factory.register((short)34, GridDhtTxPrepareRequest::new);
+        factory.register((short)35, GridDhtTxPrepareResponse::new);
+        factory.register((short)36, GridDhtUnlockRequest::new);
+        factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
+        factory.register((short)38, GridDhtAtomicUpdateRequest::new);
+        factory.register((short)39, GridDhtAtomicUpdateResponse::new);
+        factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
+        factory.register((short)41, GridNearAtomicUpdateResponse::new);
+        factory.register((short)42, GridDhtForceKeysRequest::new);
+        factory.register((short)43, GridDhtForceKeysResponse::new);
+        factory.register((short)44, GridDhtPartitionDemandLegacyMessage::new);
+        factory.register((short)45, GridDhtPartitionDemandMessage::new);
+        factory.register((short)46, GridDhtPartitionsFullMessage::new);
+        factory.register((short)47, GridDhtPartitionsSingleMessage::new);
+        factory.register((short)48, GridDhtPartitionsSingleRequest::new);
+        factory.register((short)49, GridNearGetRequest::new);
+        factory.register((short)50, GridNearGetResponse::new);
+        factory.register((short)51, GridNearLockRequest::new);
+        factory.register((short)52, GridNearLockResponse::new);
+        factory.register((short)53, GridNearTxFinishRequest::new);
+        factory.register((short)54, GridNearTxFinishResponse::new);
+        factory.register((short)55, GridNearTxPrepareRequest::new);
+        factory.register((short)56, GridNearTxPrepareResponse::new);
+        factory.register((short)57, GridNearUnlockRequest::new);
+        factory.register((short)58, GridCacheQueryRequest::new);
+        factory.register((short)59, GridCacheQueryResponse::new);
+        factory.register((short)61, GridContinuousMessage::new);
+        factory.register((short)62, DataStreamerRequest::new);
+        factory.register((short)63, DataStreamerResponse::new);
+        factory.register((short)64, IgfsAckMessage::new);
+        factory.register((short)65, IgfsBlockKey::new);
+        factory.register((short)66, IgfsBlocksMessage::new);
+        factory.register((short)67, IgfsDeleteMessage::new);
+        factory.register((short)68, IgfsFileAffinityRange::new);
+        factory.register((short)69, IgfsFragmentizerRequest::new);
+        factory.register((short)70, IgfsFragmentizerResponse::new);
+        factory.register((short)71, IgfsSyncMessage::new);
+        factory.register((short)76, GridTaskResultRequest::new);
+        factory.register((short)77, GridTaskResultResponse::new);
+        factory.register((short)78, MissingMappingRequestMessage::new);
+        factory.register((short)79, MissingMappingResponseMessage::new);
+        factory.register((short)80, MetadataRequestMessage::new);
+        factory.register((short)81, MetadataResponseMessage::new);
+        factory.register((short)82, JobStealingRequest::new);
+        factory.register((short)84, GridByteArrayList::new);
+        factory.register((short)85, GridLongList::new);
+        factory.register((short)86, GridCacheVersion::new);
+        factory.register((short)87, GridDhtPartitionExchangeId::new);
+        factory.register((short)88, GridCacheReturn::new);
+        factory.register((short)89, CacheObjectImpl::new);
+        factory.register((short)90, KeyCacheObjectImpl::new);
+        factory.register((short)91, GridCacheEntryInfo::new);
+        factory.register((short)92, CacheEntryInfoCollection::new);
+        factory.register((short)93, CacheInvokeDirectResult::new);
+        factory.register((short)94, IgniteTxKey::new);
+        factory.register((short)95, DataStreamerEntry::new);
+        factory.register((short)96, CacheContinuousQueryEntry::new);
+        factory.register((short)97, CacheEvictionEntry::new);
+        factory.register((short)98, CacheEntryPredicateContainsValue::new);
+        factory.register((short)99, CacheEntrySerializablePredicate::new);
+        factory.register((short)100, IgniteTxEntry::new);
+        factory.register((short)101, TxEntryValueHolder::new);
+        factory.register((short)102, CacheVersionedValue::new);
+        factory.register((short)103, GridCacheRawVersionedEntry::new);
+        factory.register((short)104, GridCacheVersionEx::new);
+        factory.register((short)105, CacheObjectByteArrayImpl::new);
+        factory.register((short)106, GridQueryCancelRequest::new);
+        factory.register((short)107, GridQueryFailResponse::new);
+        factory.register((short)108, GridQueryNextPageRequest::new);
+        factory.register((short)109, GridQueryNextPageResponse::new);
+        factory.register((short)111, AffinityTopologyVersion::new);
+        factory.register((short)112, GridCacheSqlQuery::new);
+        factory.register((short)113, BinaryObjectImpl::new);
+        factory.register((short)114, GridDhtPartitionSupplyMessage::new);
+        factory.register((short)115, UUIDCollectionMessage::new);
+        factory.register((short)116, GridNearSingleGetRequest::new);
+        factory.register((short)117, GridNearSingleGetResponse::new);
+        factory.register((short)118, CacheContinuousQueryBatchAck::new);
+        factory.register((short)119, BinaryEnumObjectImpl::new);
+
+        // [120..123] - DR
+        factory.register((short)124, GridMessageCollection::new);
+        factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
+        factory.register((short)126, GridNearAtomicSingleUpdateInvokeRequest::new);
+        factory.register((short)127, GridNearAtomicSingleUpdateFilterRequest::new);
+        factory.register((short)128, CacheGroupAffinityMessage::new);
+        factory.register((short)129, WalStateAckMessage::new);
+        factory.register((short)130, UserManagementOperationFinishedMessage::new);
+        factory.register((short)131, UserAuthenticateRequestMessage::new);
+        factory.register((short)132, UserAuthenticateResponseMessage::new);
+        factory.register((short)133, ClusterMetricsUpdateMessage::new);
+        factory.register((short)134, ContinuousRoutineStartResultMessage::new);
+        factory.register((short)135, LatchAckMessage::new);
+        factory.register((short)136, MvccTxSnapshotRequest::new);
+        factory.register((short)137, MvccAckRequestTx::new);
+        factory.register((short)138, MvccFutureResponse::new);
+        factory.register((short)139, MvccQuerySnapshotRequest::new);
+        factory.register((short)140, MvccAckRequestQueryCntr::new);
+        factory.register((short)141, MvccSnapshotResponse::new);
+        factory.register((short)143, GridCacheMvccEntryInfo::new);
+        factory.register((short)144, GridDhtTxQueryEnlistResponse::new);
+        factory.register((short)145, MvccAckRequestQueryId::new);
+        factory.register((short)146, MvccAckRequestTxAndQueryCntr::new);
+        factory.register((short)147, MvccAckRequestTxAndQueryId::new);
+        factory.register((short)148, MvccVersionImpl::new);
+        factory.register((short)149, MvccActiveQueriesMessage::new);
+        factory.register((short)150, MvccSnapshotWithoutTxs::new);
+        factory.register((short)151, GridNearTxQueryEnlistRequest::new);
+        factory.register((short)152, GridNearTxQueryEnlistResponse::new);
+        factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new);
+        factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new);
+        factory.register((short)155, GridDhtTxQueryEnlistRequest::new);
+        factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new);
+        factory.register((short)157, PartitionUpdateCountersMessage::new);
+        factory.register((short)158, GridDhtPartitionSupplyMessageV2::new);
+        factory.register((short)159, GridNearTxEnlistRequest::new);
+        factory.register((short)160, GridNearTxEnlistResponse::new);
+        factory.register((short)161, GridInvokeValue::new);
+        factory.register((short)162, GenerateEncryptionKeyRequest::new);
+        factory.register((short)163, GenerateEncryptionKeyResponse::new);
+        factory.register((short)164, MvccRecoveryFinishedMessage::new);
+        factory.register((short)165, PartitionCountersNeighborcastRequest::new);
+        factory.register((short)166, PartitionCountersNeighborcastResponse::new);
+        factory.register((short)167, ServiceDeploymentProcessId::new);
+        factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new);
+        factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
+        factory.register((short)170, DeadlockProbe::new);
+        factory.register((short)171, ProbedTx::new);
+        factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new);
+        factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new);
+        factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new);
+        factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
+        factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
+
+        // [-3..119] [124..129] [-23..-28] [-36..-55] - this
+        // [120..123] - DR
+        // [-4..-22, -30..-35] - SQL
+        // [2048..2053] - Snapshots
     }
 
     /** {@inheritDoc} */
     @Override public Message create(short type) {
-        Message msg = null;
-
-        switch (type) {
-            // -54 is reserved for SQL.
-            // -46 ... -51 - snapshot messages.
-            case -61:
-                msg = new IgniteDiagnosticMessage();
-
-                break;
-
-            case -53:
-                msg = new SchemaOperationStatusMessage();
-
-                break;
-
-            case -52:
-                msg = new GridIntList();
-
-                break;
-
-            case -51:
-                msg = new NearCacheUpdates();
-
-                break;
-
-            case -50:
-                msg = new GridNearAtomicCheckUpdateRequest();
-
-                break;
-
-            case -49:
-                msg = new UpdateErrors();
-
-                break;
-
-            case -48:
-                msg = new GridDhtAtomicNearResponse();
-
-                break;
-
-            case -45:
-                msg = new GridChangeGlobalStateMessageResponse();
-
-                break;
-
-            case -44:
-                msg = new HandshakeMessage2();
-
-                break;
-
-            case -43:
-                msg = new IgniteIoTestMessage();
-
-                break;
-
-            case -42:
-                msg = new HadoopDirectShuffleMessage();
-
-                break;
-
-            case -41:
-                msg = new HadoopShuffleFinishResponse();
-
-                break;
-
-            case -40:
-                msg = new HadoopShuffleFinishRequest();
-
-                break;
-
-            case -39:
-                msg = new HadoopJobId();
-
-                break;
-
-            case -38:
-                msg = new HadoopShuffleAck();
-
-                break;
-
-            case -37:
-                msg = new HadoopShuffleMessage();
-
-                break;
-
-            case -36:
-                msg = new GridDhtAtomicSingleUpdateRequest();
-
-                break;
-
-            case -27:
-                msg = new GridDhtTxOnePhaseCommitAckRequest();
-
-                break;
-
-            case -26:
-                msg = new TxLockList();
-
-                break;
-
-            case -25:
-                msg = new TxLock();
-
-                break;
-
-            case -24:
-                msg = new TxLocksRequest();
-
-                break;
-
-            case -23:
-                msg = new TxLocksResponse();
-
-                break;
-
-            case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
-                msg = new NodeIdMessage();
-
-                break;
-
-            case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
-                msg = new RecoveryLastReceivedMessage();
-
-                break;
-
-            case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
-                msg = new HandshakeMessage();
-
-                break;
-
-            case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE:
-                msg = new HandshakeWaitMessage();
-
-                break;
-
-            case 0:
-                msg = new GridJobCancelRequest();
-
-                break;
-
-            case 1:
-                msg = new GridJobExecuteRequest();
-
-                break;
-
-            case 2:
-                msg = new GridJobExecuteResponse();
-
-                break;
-
-            case 3:
-                msg = new GridJobSiblingsRequest();
-
-                break;
-
-            case 4:
-                msg = new GridJobSiblingsResponse();
-
-                break;
-
-            case 5:
-                msg = new GridTaskCancelRequest();
-
-                break;
-
-            case 6:
-                msg = new GridTaskSessionRequest();
-
-                break;
-
-            case 7:
-                msg = new GridCheckpointRequest();
-
-                break;
-
-            case 8:
-                msg = new GridIoMessage();
-
-                break;
-
-            case 9:
-                msg = new GridIoUserMessage();
-
-                break;
-
-            case 10:
-                msg = new GridDeploymentInfoBean();
-
-                break;
-
-            case 11:
-                msg = new GridDeploymentRequest();
-
-                break;
-
-            case 12:
-                msg = new GridDeploymentResponse();
-
-                break;
-
-            case 13:
-                msg = new GridEventStorageMessage();
-
-                break;
-
-            case 16:
-                msg = new GridCacheTxRecoveryRequest();
-
-                break;
-
-            case 17:
-                msg = new GridCacheTxRecoveryResponse();
-
-                break;
-
-            case 20:
-                msg = new GridCacheTtlUpdateRequest();
-
-                break;
-
-            case 21:
-                msg = new GridDistributedLockRequest();
-
-                break;
-
-            case 22:
-                msg = new GridDistributedLockResponse();
-
-                break;
-
-            case 23:
-                msg = new GridDistributedTxFinishRequest();
-
-                break;
-
-            case 24:
-                msg = new GridDistributedTxFinishResponse();
-
-                break;
-
-            case 25:
-                msg = new GridDistributedTxPrepareRequest();
-
-                break;
-
-            case 26:
-                msg = new GridDistributedTxPrepareResponse();
-
-                break;
-
-            case 27:
-                msg = new GridDistributedUnlockRequest();
-
-                break;
-
-            case 28:
-                msg = new GridDhtAffinityAssignmentRequest();
-
-                break;
-
-            case 29:
-                msg = new GridDhtAffinityAssignmentResponse();
-
-                break;
-
-            case 30:
-                msg = new GridDhtLockRequest();
-
-                break;
-
-            case 31:
-                msg = new GridDhtLockResponse();
-
-                break;
-
-            case 32:
-                msg = new GridDhtTxFinishRequest();
-
-                break;
-
-            case 33:
-                msg = new GridDhtTxFinishResponse();
-
-                break;
-
-            case 34:
-                msg = new GridDhtTxPrepareRequest();
-
-                break;
-
-            case 35:
-                msg = new GridDhtTxPrepareResponse();
-
-                break;
-
-            case 36:
-                msg = new GridDhtUnlockRequest();
-
-                break;
-
-            case 37:
-                msg = new GridDhtAtomicDeferredUpdateResponse();
-
-                break;
-
-            case 38:
-                msg = new GridDhtAtomicUpdateRequest();
-
-                break;
-
-            case 39:
-                msg = new GridDhtAtomicUpdateResponse();
-
-                break;
-
-            case 40:
-                msg = new GridNearAtomicFullUpdateRequest();
-
-                break;
-
-            case 41:
-                msg = new GridNearAtomicUpdateResponse();
-
-                break;
-
-            case 42:
-                msg = new GridDhtForceKeysRequest();
-
-                break;
-
-            case 43:
-                msg = new GridDhtForceKeysResponse();
-
-                break;
-
-            case 44:
-                msg = new GridDhtPartitionDemandLegacyMessage();
-
-                break;
-
-            case 45:
-                msg = new GridDhtPartitionDemandMessage();
-
-                break;
-
-            case 46:
-                msg = new GridDhtPartitionsFullMessage();
-
-                break;
-
-            case 47:
-                msg = new GridDhtPartitionsSingleMessage();
-
-                break;
-
-            case 48:
-                msg = new GridDhtPartitionsSingleRequest();
-
-                break;
-
-            case 49:
-                msg = new GridNearGetRequest();
-
-                break;
-
-            case 50:
-                msg = new GridNearGetResponse();
-
-                break;
-
-            case 51:
-                msg = new GridNearLockRequest();
-
-                break;
-
-            case 52:
-                msg = new GridNearLockResponse();
-
-                break;
-
-            case 53:
-                msg = new GridNearTxFinishRequest();
-
-                break;
-
-            case 54:
-                msg = new GridNearTxFinishResponse();
-
-                break;
-
-            case 55:
-                msg = new GridNearTxPrepareRequest();
-
-                break;
-
-            case 56:
-                msg = new GridNearTxPrepareResponse();
-
-                break;
-
-            case 57:
-                msg = new GridNearUnlockRequest();
-
-                break;
-
-            case 58:
-                msg = new GridCacheQueryRequest();
-
-                break;
-
-            case 59:
-                msg = new GridCacheQueryResponse();
-
-                break;
-
-            case 61:
-                msg = new GridContinuousMessage();
-
-                break;
-
-            case 62:
-                msg = new DataStreamerRequest();
-
-                break;
-
-            case 63:
-                msg = new DataStreamerResponse();
-
-                break;
-
-            case 64:
-                msg = new IgfsAckMessage();
-
-                break;
-
-            case 65:
-                msg = new IgfsBlockKey();
-
-                break;
-
-            case 66:
-                msg = new IgfsBlocksMessage();
-
-                break;
-
-            case 67:
-                msg = new IgfsDeleteMessage();
-
-                break;
-
-            case 68:
-                msg = new IgfsFileAffinityRange();
-
-                break;
-
-            case 69:
-                msg = new IgfsFragmentizerRequest();
-
-                break;
-
-            case 70:
-                msg = new IgfsFragmentizerResponse();
-
-                break;
-
-            case 71:
-                msg = new IgfsSyncMessage();
-
-                break;
-
-            case 76:
-                msg = new GridTaskResultRequest();
-
-                break;
-
-            case 77:
-                msg = new GridTaskResultResponse();
-
-                break;
-
-            case 78:
-                msg = new MissingMappingRequestMessage();
-
-                break;
-
-            case 79:
-                msg = new MissingMappingResponseMessage();
-
-                break;
-
-            case 80:
-                msg = new MetadataRequestMessage();
-
-                break;
-
-            case 81:
-                msg = new MetadataResponseMessage();
-
-                break;
-
-            case 82:
-                msg = new JobStealingRequest();
-
-                break;
-
-            case 84:
-                msg = new GridByteArrayList();
-
-                break;
-
-            case 85:
-                msg = new GridLongList();
-
-                break;
-
-            case 86:
-                msg = new GridCacheVersion();
-
-                break;
-
-            case 87:
-                msg = new GridDhtPartitionExchangeId();
-
-                break;
-
-            case 88:
-                msg = new GridCacheReturn();
-
-                break;
-
-            case 89:
-                msg = new CacheObjectImpl();
-
-                break;
-
-            case 90:
-                msg = new KeyCacheObjectImpl();
-
-                break;
-
-            case 91:
-                msg = new GridCacheEntryInfo();
-
-                break;
-
-            case 92:
-                msg = new CacheEntryInfoCollection();
-
-                break;
-
-            case 93:
-                msg = new CacheInvokeDirectResult();
-
-                break;
-
-            case 94:
-                msg = new IgniteTxKey();
-
-                break;
-
-            case 95:
-                msg = new DataStreamerEntry();
-
-                break;
-
-            case 96:
-                msg = new CacheContinuousQueryEntry();
-
-                break;
-
-            case 97:
-                msg = new CacheEvictionEntry();
-
-                break;
-
-            case 98:
-                msg = new CacheEntryPredicateContainsValue();
-
-                break;
-
-            case 99:
-                msg = new CacheEntrySerializablePredicate();
-
-                break;
-
-            case 100:
-                msg = new IgniteTxEntry();
-
-                break;
-
-            case 101:
-                msg = new TxEntryValueHolder();
-
-                break;
-
-            case 102:
-                msg = new CacheVersionedValue();
-
-                break;
-
-            case 103:
-                msg = new GridCacheRawVersionedEntry<>();
-
-                break;
-
-            case 104:
-                msg = new GridCacheVersionEx();
-
-                break;
-
-            case 105:
-                msg = new CacheObjectByteArrayImpl();
-
-                break;
-
-            case 106:
-                msg = new GridQueryCancelRequest();
-
-                break;
-
-            case 107:
-                msg = new GridQueryFailResponse();
-
-                break;
-
-            case 108:
-                msg = new GridQueryNextPageRequest();
-
-                break;
-
-            case 109:
-                msg = new GridQueryNextPageResponse();
-
-                break;
-
-            case 110:
-                // EMPTY type
-                // GridQueryRequest was removed
-                break;
-
-            case 111:
-                msg = new AffinityTopologyVersion();
-
-                break;
-
-            case 112:
-                msg = new GridCacheSqlQuery();
-
-                break;
-
-            case 113:
-                msg = new BinaryObjectImpl();
-
-                break;
-
-            case 114:
-                msg = new GridDhtPartitionSupplyMessage();
-
-                break;
-
-            case 115:
-                msg = new UUIDCollectionMessage();
-
-                break;
-
-            case 116:
-                msg = new GridNearSingleGetRequest();
-
-                break;
-
-            case 117:
-                msg = new GridNearSingleGetResponse();
-
-                break;
-
-            case 118:
-                msg = new CacheContinuousQueryBatchAck();
-
-                break;
-
-            case 119:
-                msg = new BinaryEnumObjectImpl();
-
-                break;
-
-            // [120..123] - DR
-            case 124:
-                msg = new GridMessageCollection<>();
-
-                break;
-
-            case 125:
-                msg = new GridNearAtomicSingleUpdateRequest();
-
-                break;
-
-            case 126:
-                msg = new GridNearAtomicSingleUpdateInvokeRequest();
-
-                break;
-
-            case 127:
-                msg = new GridNearAtomicSingleUpdateFilterRequest();
-
-                break;
-
-            case 128:
-                msg = new CacheGroupAffinityMessage();
-
-                break;
-
-            case 129:
-                msg = new WalStateAckMessage();
-
-                break;
-
-            case 130:
-                msg = new UserManagementOperationFinishedMessage();
-
-                break;
-
-            case 131:
-                msg = new UserAuthenticateRequestMessage();
-
-                break;
-
-            case 132:
-                msg = new UserAuthenticateResponseMessage();
-
-                break;
-
-            case 133:
-                msg = new ClusterMetricsUpdateMessage();
-
-                break;
-
-            case 134:
-                msg = new ContinuousRoutineStartResultMessage();
-
-                break;
-
-            case 135:
-                msg = new LatchAckMessage();
-
-                break;
-
-            case 136:
-                msg = new MvccTxSnapshotRequest();
-
-                break;
-
-            case 137:
-                msg = new MvccAckRequestTx();
-
-                break;
-
-            case 138:
-                msg = new MvccFutureResponse();
-
-                break;
-
-            case 139:
-                msg = new MvccQuerySnapshotRequest();
-
-                break;
-
-            case 140:
-                msg = new MvccAckRequestQueryCntr();
-
-                break;
-
-            case 141:
-                msg = new MvccSnapshotResponse();
-
-                break;
-
-            case 143:
-                msg = new GridCacheMvccEntryInfo();
-
-                break;
-
-            case 144:
-                msg = new GridDhtTxQueryEnlistResponse();
-
-                break;
-
-            case 145:
-                msg = new MvccAckRequestQueryId();
-
-                break;
-
-            case 146:
-                msg = new MvccAckRequestTxAndQueryCntr();
-
-                break;
-
-            case 147:
-                msg = new MvccAckRequestTxAndQueryId();
-
-                break;
-
-            case 148:
-                msg = new MvccVersionImpl();
-
-                break;
-
-            case 149:
-                msg = new MvccActiveQueriesMessage();
-
-                break;
-
-            case 150:
-                msg = new MvccSnapshotWithoutTxs();
-
-                break;
-
-            case 151:
-                msg = new GridNearTxQueryEnlistRequest();
-
-                break;
-
-            case 152:
-                msg = new GridNearTxQueryEnlistResponse();
-
-                break;
-
-            case 153:
-                msg = new GridNearTxQueryResultsEnlistRequest();
-
-                break;
-
-            case 154:
-                msg = new GridNearTxQueryResultsEnlistResponse();
-
-                break;
-
-            case 155:
-                msg = new GridDhtTxQueryEnlistRequest();
-
-                break;
-
-            case 156:
-                msg = new GridDhtTxQueryFirstEnlistRequest();
-
-                break;
-
-            case 157:
-                msg = new PartitionUpdateCountersMessage();
-
-                break;
-
-            case 158:
-                msg = new GridDhtPartitionSupplyMessageV2();
-
-                break;
-
-            case 159:
-                msg = new GridNearTxEnlistRequest();
-
-                break;
-
-            case 160:
-                msg = new GridNearTxEnlistResponse();
-
-                break;
-
-            case 161:
-                msg = new GridInvokeValue();
-
-                break;
-
-            case 162:
-                msg = new GenerateEncryptionKeyRequest();
-
-                break;
-
-            case 163:
-                msg = new GenerateEncryptionKeyResponse();
-
-                break;
-
-            case 164:
-                msg = new MvccRecoveryFinishedMessage();
-
-                break;
-
-            case 165:
-                msg = new PartitionCountersNeighborcastRequest();
-
-                break;
-
-            case 166:
-                msg = new PartitionCountersNeighborcastResponse();
-
-                break;
-
-            case 167:
-                msg = new ServiceDeploymentProcessId();
-
-                break;
-
-            case 168:
-                msg = new ServiceSingleNodeDeploymentResultBatch();
-
-                break;
-
-            case 169:
-                msg = new ServiceSingleNodeDeploymentResult();
-
-                break;
-
-            case 170:
-                msg = new DeadlockProbe();
-
-                break;
-
-            case 171:
-                msg = new ProbedTx();
-
-                break;
-
-            case GridQueryKillRequest.TYPE_CODE:
-                msg = new GridQueryKillRequest();
-
-                break;
-
-            case GridQueryKillResponse.TYPE_CODE:
-                msg = new GridQueryKillResponse();
-
-                break;
-
-            case GridIoSecurityAwareMessage.TYPE_CODE:
-                msg = new GridIoSecurityAwareMessage();
-
-                break;
-
-            case SessionChannelMessage.TYPE_CODE:
-                msg = new SessionChannelMessage();
-
-                break;
-
-            case SingleNodeMessage.TYPE_CODE:
-                msg = new SingleNodeMessage<>();
-
-                break;
-
-            // [-3..119] [124..129] [-23..-28] [-36..-55] - this
-            // [120..123] - DR
-            // [-4..-22, -30..-35] - SQL
-            // [2048..2053] - Snapshots
-            default:
-                if (ext != null) {
-                    for (MessageFactory factory : ext) {
-                        msg = factory.create(type);
-
-                        if (msg != null)
-                            break;
-                    }
-                }
-
-                if (msg == null) {
-                    IgniteOutClosure<Message> c = CUSTOM.get(type);
-
-                    if (c != null)
-                        msg = c.apply();
-                }
-        }
-
-        if (msg == null)
-            throw new IgniteException("Invalid message type: " + type);
-
-        return msg;
-    }
-
-    /**
-     * Registers factory for custom message. Used for test purposes.
-     *
-     * @param type Message type.
-     * @param c Message producer.
-     */
-    public static void registerCustom(short type, IgniteOutClosure<Message> c) {
-        assert c != null;
-
-        CUSTOM.put(type, c);
+        throw new UnsupportedOperationException();
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
new file mode 100644
index 0000000..eb89043
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Message factory implementation which is responsible for instantiation of all communication messages.
+ */
+public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
+    /** Offset. */
+    private static final int OFF = -Short.MIN_VALUE;
+
+    /** Array size. */
+    private static final int ARR_SIZE = 1 << Short.SIZE;
+
+    /** Custom messages registry. Used for test purposes. */
+    private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>();
+
+    /** Message suppliers. */
+    private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE);
+
+    /** Initialized flag. If {@code true} then new message type couldn't be registered. */
+    private boolean initialized;
+
+    /**
+     * Contructor.
+     *
+     * @param factories Concrete message factories or message factory providers. Cfn't be empty or {@code null}.
+     */
+    public IgniteMessageFactoryImpl(MessageFactory[] factories) {
+        if (factories == null || factories.length == 0)
+            throw new IllegalArgumentException("Message factory couldn't be initialized. Factories aren't provided.");
+
+        List<MessageFactory> old = new ArrayList<>(factories.length);
+
+        for (MessageFactory factory : factories) {
+            if (factory instanceof MessageFactoryProvider) {
+                MessageFactoryProvider p = (MessageFactoryProvider)factory;
+
+                p.registerAll(this);
+            }
+            else
+                old.add(factory);
+        }
+
+        if (!old.isEmpty()) {
+            for (int i = 0; i < ARR_SIZE; i++) {
+                Supplier<Message> curr = msgSuppliers[i];
+
+                if (curr == null) {
+                    short directType = indexToDirectType(i);
+
+                    for (MessageFactory factory : old) {
+                        Message msg = factory.create(directType);
+
+                        if (msg != null)
+                            register(directType, () -> factory.create(directType));
+                    }
+                }
+            }
+        }
+
+        initialized = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
+        if (initialized) {
+            throw new IllegalStateException("Message factory is already initialized. " +
+                    "Registration of new message types is forbidden.");
+        }
+
+        int idx = directTypeToIndex(directType);
+
+        Supplier<Message> curr = msgSuppliers[idx];
+
+        if (curr == null)
+            msgSuppliers[idx] = supplier;
+        else
+            throw new IgniteException("Message factory is already registered for direct type: " + directType);
+    }
+
+    /**
+     * Creates new message instance of provided direct type.
+     * <p>
+     *
+     * @param directType Message direct type.
+     * @return Message instance.
+     * @throws IgniteException If there are no any message factory for given {@code directType}.
+     */
+    @Override public @Nullable Message create(short directType) {
+        Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];
+
+        if (supplier == null)
+            supplier = CUSTOM.get(directType);
+
+        if (supplier == null)
+            throw new IgniteException("Invalid message type: " + directType);
+
+        return supplier.get();
+    }
+
+    /**
+     * @param directType Direct type.
+     */
+    private static int directTypeToIndex(short directType) {
+        return directType + OFF;
+    }
+
+    /**
+     * @param idx Index.
+     */
+    private static short indexToDirectType(int idx) {
+        int res = idx - OFF;
+
+        assert res >= Short.MIN_VALUE && res <= Short.MAX_VALUE;
+
+        return (short)res;
+    }
+
+    /**
+     * Registers factory for custom message. Used for test purposes.
+     *
+     * @param type Message type.
+     * @param c Message producer.
+     *
+     * @deprecated Should be removed. Please don't use this method anymore.
+     * Consider using of plugin with own message types.
+     */
+    @TestOnly
+    @Deprecated
+    public static void registerCustom(short type, Supplier<Message> c) {
+        assert c != null;
+
+        CUSTOM.put(type, c);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
new file mode 100644
index 0000000..ae159b3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Message factory for all communication messages registered using {@link #register(short, Supplier)} method call.
+ */
+public interface IgniteMessageFactory extends MessageFactory {
+    /**
+     * Register message factory with given direct type. All messages must be registered during construction
+     * of class which implements this interface. Any invocation of this method after initialization is done must
+     * throw {@link IllegalStateException} exception.
+     *
+     * @param directType Direct type.
+     * @param supplier Message factory.
+     * @throws IgniteException In case of attempt to register message with direct type which is already registered.
+     * @throws IllegalStateException On any invocation of this method when class which implements this interface
+     * is alredy constructed.
+     */
+    public void register(short directType, Supplier<Message> supplier) throws IgniteException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
index 1ea88fb..07a3d5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -26,7 +26,10 @@ import org.jetbrains.annotations.Nullable;
  * A plugin can provide his own message factory as an extension
  * if it uses any custom messages (all message must extend
  * {@link Message} class).
+ *
+ * @deprecated Use {@link MessageFactoryProvider} instead.
  */
+@Deprecated
 public interface MessageFactory extends Extension {
     /**
      * Creates new message instance of provided type.
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
copy to modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
index 1ea88fb..910c68c 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
@@ -17,25 +17,30 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
-import org.apache.ignite.plugin.Extension;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Factory for communication messages.
+ * Provider of communication message factories.
  * <p>
- * A plugin can provide his own message factory as an extension
- * if it uses any custom messages (all message must extend
- * {@link Message} class).
+ * Implementation of this interface is responsible for registration of all message factories in
+ * {@link #registerAll} method.
+ * <p>
+ * {@link #registerAll} method's call is responsibility of {@link IgniteMessageFactory} implementation.
  */
-public interface MessageFactory extends Extension {
+public interface MessageFactoryProvider extends MessageFactory {
     /**
-     * Creates new message instance of provided type.
-     * <p>
-     * This method should return {@code null} if provided message type
-     * is unknown to this factory.
+     * Registers all messages factories. See {@link IgniteMessageFactory#register}.
      *
-     * @param type Message type.
-     * @return Message instance.
+     * @param factory {@link IgniteMessageFactory} implementation.
+     */
+    public void registerAll(IgniteMessageFactory factory);
+
+    /**
+     * Always throws {@link UnsupportedOperationException}.
+     * @param type Message direct type.
+     * @throws UnsupportedOperationException On any invocation.
      */
-    @Nullable public Message create(short type);
+    @Override @Nullable public default Message create(short type) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index d89fe2c..1be4e6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -109,7 +109,7 @@ class TcpCommunicationMetricsListener {
     private final Object msgTypMapMux = new Object();
 
     /** Message type map. */
-    private volatile Map<Short, String> msgTypMap;
+    private volatile Map<Short, String> msgTypeMap;
 
     /** */
     public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
@@ -285,7 +285,7 @@ class TcpCommunicationMetricsListener {
             if (metric.name().startsWith(prefix)) {
                 short directType = Short.parseShort(metric.name().substring(prefix.length()));
 
-                Map<Short, String> msgTypMap0 = msgTypMap;
+                Map<Short, String> msgTypMap0 = msgTypeMap;
 
                 if (msgTypMap0 != null) {
                     String typeName = msgTypMap0.get(directType);
@@ -374,24 +374,24 @@ class TcpCommunicationMetricsListener {
     private void updateMessageTypeMap(Message msg) {
         short typeId = msg.directType();
 
-        Map<Short, String> msgTypMap0 = msgTypMap;
+        Map<Short, String> msgTypMap0 = msgTypeMap;
 
         if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
             synchronized (msgTypMapMux) {
-                if (msgTypMap == null) {
+                if (msgTypeMap == null) {
                     msgTypMap0 = new HashMap<>();
 
                     msgTypMap0.put(typeId, msg.getClass().getName());
 
-                    msgTypMap = msgTypMap0;
+                    msgTypeMap = msgTypMap0;
                 }
                 else {
-                    if (!msgTypMap.containsKey(typeId)) {
-                        msgTypMap0 = new HashMap<>(msgTypMap);
+                    if (!msgTypeMap.containsKey(typeId)) {
+                        msgTypMap0 = new HashMap<>(msgTypeMap);
 
                         msgTypMap0.put(typeId, msg.getClass().getName());
 
-                        msgTypMap = msgTypMap0;
+                        msgTypeMap = msgTypMap0;
                     }
                 }
             }
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
index 6bca88f..7704c0b 100644
--- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -2,3 +2,4 @@ org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteSta
 org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider
 org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin
 org.apache.ignite.plugin.NodeValidationPluginProvider
+
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 108e4672..6dd103e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -22,11 +22,9 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
-import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.IgniteSpiAdapter;
 import org.apache.ignite.spi.IgniteSpiContext;
@@ -48,11 +46,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
     private static final short DIRECT_TYPE = 210;
 
     static {
-        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridIoUserMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new);
     }
 
     /** {@inheritDoc} */
@@ -180,7 +174,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
         private IgniteSpiContext spiCtx;
 
         /** Test message topic. **/
-        private String TEST_TOPIC = "test_topic";
+        private static final String TEST_TOPIC = "test_topic";
 
         /** {@inheritDoc} */
         @Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
@@ -192,6 +186,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
             // No-op.
         }
 
+        /** {@inheritDoc} */
         @Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
             this.spiCtx = spiCtx;
 
@@ -203,6 +198,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
 
         }
 
+        /** {@inheritDoc} */
         @Override public void onContextDestroyed0() {
             spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
                 @Override public boolean apply(UUID uuid, Object o) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8a27a46..0a75cf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -46,20 +45,10 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
     /** */
     private static final short DIRECT_TYPE_OVER_BYTE = 1000;
 
-    /** */
-    private int bufSize;
-
     static {
-        GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestMessage();
-            }
-        });
-        GridIoMessageFactory.registerCustom(DIRECT_TYPE_OVER_BYTE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestOverByteIdMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new);
+
+        IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
     }
 
     /** {@inheritDoc} */
@@ -108,8 +97,6 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
      */
     @Test
     public void testSendMessageWithBuffer() throws Exception {
-        bufSize = 8192;
-
         try {
             startGridsMultiThreaded(2);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
new file mode 100644
index 0000000..a29177f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for default implementation of {@link IgniteMessageFactory} interface.
+ */
+public class IgniteMessageFactoryImplTest {
+    /** Test message 1 type. */
+    private static final short TEST_MSG_1_TYPE = 1;
+
+    /** Test message 2 type. */
+    private static final short TEST_MSG_2_TYPE = 2;
+
+    /** Unknown message type. */
+    private static final short UNKNOWN_MSG_TYPE = 0;
+
+    /**
+     * Tests that impossible register new message after initialization.
+     */
+    @Test(expected = IllegalStateException.class)
+    public void testReadOnly() {
+        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+        msgFactory.register((short)0, () -> null);
+    }
+
+    /**
+     * Tests that proper message type will be returned by message factory.
+     */
+    @Test
+    public void testCreate() {
+        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+        Message msg;
+
+        msg = msgFactory.create(TEST_MSG_1_TYPE);
+        assertTrue(msg instanceof TestMessage1);
+
+        msg = msgFactory.create(TEST_MSG_2_TYPE);
+        assertTrue(msg instanceof TestMessage2);
+
+        msg = msgFactory.create(TEST_MSG_2_TYPE);
+        assertTrue(msg instanceof TestMessage2);
+    }
+
+    /**
+     * Tests that exception will be thrown for unknown message direct type.
+     */
+    @Test(expected = IgniteException.class)
+    public void testCreate_UnknownMessageType() {
+        MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+        IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+        msgFactory.create(UNKNOWN_MSG_TYPE);
+    }
+
+    /**
+     * Tests attemption of registration message with already registered message type.
+     */
+    @Test(expected = IgniteException.class)
+    @SuppressWarnings("ResultOfObjectAllocationIgnored")
+    public void testRegisterTheSameType() {
+        MessageFactory[] factories = {
+                new TestMessageFactoryPovider(),
+                new TestMessageFactory(),
+                new TestMessageFactoryPoviderWithTheSameDirectType()
+        };
+
+        new IgniteMessageFactoryImpl(factories);
+    }
+
+
+    /**
+     * {@link MessageFactoryProvider} implementation.
+     */
+    private static class TestMessageFactoryPovider implements MessageFactoryProvider {
+        /** {@inheritDoc} */
+        @Override public void registerAll(IgniteMessageFactory factory) {
+            factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+        }
+    }
+
+    /**
+     * {@link MessageFactoryProvider} implementation with message direct type which is already registered.
+     */
+    private static class TestMessageFactoryPoviderWithTheSameDirectType implements MessageFactoryProvider {
+        /** {@inheritDoc} */
+        @Override public void registerAll(IgniteMessageFactory factory) {
+            factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+        }
+    }
+
+    /**
+     * {@link MessageFactory} implementation whish still uses creation with switch-case.
+     */
+    private static class TestMessageFactory implements MessageFactory {
+        /** {@inheritDoc} */
+        @Override public @Nullable Message create(short type) {
+            switch (type) {
+                case TEST_MSG_2_TYPE:
+                    return new TestMessage2();
+
+                default:
+                    return null;
+            }
+        }
+    }
+
+    /** Test message. */
+    private static class TestMessage1 implements Message {
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte fieldsCount() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            // No-op.
+        }
+    }
+
+    /** Test message. */
+    private static class TestMessage2 implements Message {
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return 2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte fieldsCount() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            // No-op.
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
new file mode 100644
index 0000000..19ab431
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Tests that node will not start if some component tries to register message factory with direct type
+ * for which message factory is already registered.
+ */
+public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
+    /** Test plugin name. */
+    private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN";
+
+    /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */
+    private static final short MSG_DIRECT_TYPE = -44;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setPluginProviders(new TestPluginProvider());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Tests that node will not start if some component tries to register message factory with direct type
+     * for which message factory is already registered.
+     */
+    @Test
+    public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception {
+        assertThrows(log, this::startGrid, IgniteCheckedException.class,
+                "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE);
+    }
+
+    /** Plugin with own message factory. */
+    private static class TestPlugin implements IgnitePlugin {
+    }
+
+    /** */
+    public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return TEST_PLUGIN_NAME;
+        }
+
+        /** {@inheritDoc} */
+        @Override  public <T extends IgnitePlugin> T plugin() {
+            return (T)new TestPlugin();
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String version() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String copyright() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
+            registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
+                @Override public void registerAll(IgniteMessageFactory factory) {
+                    factory.register(MSG_DIRECT_TYPE, TestMessage::new);
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stop(boolean cancel) throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStart() throws IgniteCheckedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIgniteStop(boolean cancel) {
+            // no-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void validateNewNode(ClusterNode node, Serializable data) {
+            // No-op.
+        }
+    }
+
+    /** */
+    private static class TestPluginConfiguration implements PluginConfiguration {
+    }
+
+    /** Test message with already registered direct type. */
+    private static class TestMessage implements Message {
+        /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return MSG_DIRECT_TYPE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte fieldsCount() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAckReceived() {
+            // No-op.
+        }
+    }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index ea94bd2..5c6082d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,10 +22,8 @@ import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.typedef.CO;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -42,11 +40,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
     }
 
     /** {@inheritDoc} */
@@ -62,8 +56,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      * @return Cache configuration.
      * @throws Exception In case of error.
      */
-    protected CacheConfiguration cacheConfiguration() throws Exception {
-        CacheConfiguration cfg = defaultCacheConfiguration();
+    protected CacheConfiguration<?, ?> cacheConfiguration() throws Exception {
+        CacheConfiguration<?, ?> cfg = defaultCacheConfiguration();
 
         cfg.setCacheMode(PARTITIONED);
         cfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -113,7 +107,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      */
     @Test
     public void testAddedDeploymentInfo() throws Exception {
-        GridCacheContext ctx = cacheContext();
+        GridCacheContext<?, ?> ctx = cacheContext();
 
         if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
             assertFalse(ctx.deploymentEnabled());
@@ -137,7 +131,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
      */
     @Test
     public void testAddedDeploymentInfo2() throws Exception {
-        GridCacheContext ctx = cacheContext();
+        GridCacheContext<?, ?> ctx = cacheContext();
 
         if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
             assertFalse(ctx.deploymentEnabled());
@@ -161,8 +155,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
     /**
      * @return Cache context.
      */
-    protected GridCacheContext cacheContext() {
-        return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context();
+    protected GridCacheContext<?, ?> cacheContext() {
+        return ((IgniteCacheProxy<?, ?>)grid(0).cache(DEFAULT_CACHE_NAME)).context();
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index aeaabda..49d588f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -33,9 +33,11 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -60,7 +62,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
         ccfg.setCacheMode(PARTITIONED);
         ccfg.setAtomicityMode(atomicityMode());
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -153,7 +155,9 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
         e0.writeTo(buf, writer);
 
         CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
-        e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1));
+        IgniteMessageFactoryImpl msgFactory =
+                new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+        e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(msgFactory, (byte)1));
 
         assertEquals(e0.cacheId(), e1.cacheId());
         assertEquals(e0.eventType(), e1.eventType());
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 8034093..326b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -29,9 +29,8 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -52,7 +51,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
  * Super class for all communication self tests.
  * @param <T> Type of communication SPI.
  */
-public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
     /** */
     private static long msgId = 1;
 
@@ -75,17 +74,13 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
     private static GridTimeoutProcessor timeoutProcessor;
 
     /** */
-    protected boolean useSsl = false;
+    protected boolean useSsl;
 
     /**
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /** */
@@ -162,7 +157,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             for (ClusterNode node : nodes) {
                 synchronized (mux) {
                     if (!msgDestMap.containsKey(entry.getKey()))
-                        msgDestMap.put(entry.getKey(), new HashSet<UUID>());
+                        msgDestMap.put(entry.getKey(), new HashSet<>());
 
                     msgDestMap.get(entry.getKey()).add(node.id());
                 }
@@ -208,7 +203,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
             for (ClusterNode node : nodes) {
                 synchronized (mux) {
                     if (!msgDestMap.containsKey(sndId))
-                        msgDestMap.put(sndId, new HashSet<UUID>());
+                        msgDestMap.put(sndId, new HashSet<>());
 
                     msgDestMap.get(sndId).add(node.id());
                 }
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 797328e..21a846d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -31,14 +31,13 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.failure.AbstractFailureHandler;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -66,35 +65,15 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
 
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
 
-        GridIoMessageFactory.registerCustom(TestMessage1.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestMessage1();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new);
 
-        GridIoMessageFactory.registerCustom(TestMessage2.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestMessage2();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new);
 
-        GridIoMessageFactory.registerCustom(TestBadMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new TestBadMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
     }
 
     /** {@inheritDoc} */
@@ -105,7 +84,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
         cfg.setFailureHandler(new TestFailureHandler());
 
-        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(CacheMode.PARTITIONED);
         ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
@@ -146,25 +125,25 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
         try {
             startGrids(2);
 
-            Ignite ignite0 = grid(0);
-            Ignite ignite1 = grid(1);
+            IgniteEx ignite0 = grid(0);
+            IgniteEx ignite1 = grid(1);
 
-            ((IgniteKernal)ignite0).context().cache().context().io().addCacheHandler(
+            ignite0.context().cache().context().io().addCacheHandler(
                 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
                 @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                     throw new RuntimeException("Test bad message exception");
                 }
             });
 
-            ((IgniteKernal)ignite1).context().cache().context().io().addCacheHandler(
+            ignite1.context().cache().context().io().addCacheHandler(
                 0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
                     @Override public void apply(UUID nodeId, GridCacheMessage msg) {
                         throw new RuntimeException("Test bad message exception");
                     }
                 });
 
-            ((IgniteKernal)ignite0).context().cache().context().io().send(
-                ((IgniteKernal)ignite1).localNode().id(), new TestBadMessage(), (byte)2);
+            ignite0.context().cache().context().io().send(
+                ignite1.localNode().id(), new TestBadMessage(), (byte)2);
 
             boolean res = failureLatch.await(5, TimeUnit.SECONDS);
 
@@ -179,8 +158,8 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void doSend() throws Exception {
-        GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
-        GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
+        GridIoManager mgr0 = grid(0).context().io();
+        GridIoManager mgr1 = grid(1).context().io();
 
         String topic = "test-topic";
 
@@ -195,14 +174,14 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
                     assertEquals(10, messages.size());
 
-                    int count = 0;
+                    int cnt = 0;
 
                     for (TestMessage1 msg1 : messages) {
                         assertTrue(msg1.body().contains(TEST_BODY));
 
                         int i = Integer.parseInt(msg1.body().substring(TEST_BODY.length() + 1));
 
-                        assertEquals(count, i);
+                        assertEquals(cnt, i);
 
                         TestMessage2 msg2 = (TestMessage2) msg1.message();
 
@@ -214,11 +193,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
 
                         GridTestMessage msg3 = (GridTestMessage) msg2.message();
 
-                        assertEquals(count, msg3.getMsgId());
+                        assertEquals(cnt, msg3.getMsgId());
 
                         assertEquals(grid(1).localNode().id(), msg3.getSourceNodeId());
 
-                        count++;
+                        cnt++;
                     }
                 }
                 catch (Exception e) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 7940a63..54e5386 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
 /**
  * Test for {@link TcpCommunicationSpi}
  */
-abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
+abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> {
     /** */
     private static final int SPI_COUNT = 3;
 
@@ -59,7 +59,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     }
 
     /** {@inheritDoc} */
-    @Override protected CommunicationSpi getSpi(int idx) {
+    @Override protected CommunicationSpi<Message> getSpi(int idx) {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
         if (!useShmem)
@@ -88,7 +88,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.testSendToManyNodes();
 
         // Test idle clients remove.
-        for (CommunicationSpi spi : spis.values()) {
+        for (CommunicationSpi<Message> spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
             assertEquals(getSpiCount() - 1, clients.size());
@@ -129,13 +129,13 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
 
         final CyclicBarrier b = new CyclicBarrier(THREADS);
 
-        List<IgniteInternalFuture> futs = new ArrayList<>();
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
             final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
 
-            futs.add(GridTestUtils.runAsync(new Callable() {
-                @Override public Object call() throws Exception {
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
                     List<ClusterNode> checkNodes = new ArrayList<>(nodes);
 
                     assert checkNodes.size() > 1;
@@ -156,7 +156,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
             }));
         }
 
-        for (IgniteInternalFuture f : futs)
+        for (IgniteInternalFuture<?> f : futs)
             f.get();
     }
 
@@ -164,7 +164,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        for (CommunicationSpi spi : spis.values()) {
+        for (CommunicationSpi<Message> spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
 
             for (int i = 0; i < 20; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 7ab0d6f..69e4bef 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -37,13 +37,12 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -65,7 +64,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi>
+public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi<Message>>
     extends GridSpiAbstractTest<T> {
     /** */
     private static final int SPI_CNT = 2;
@@ -101,11 +100,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
@@ -356,24 +351,24 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
-                    for (CommunicationSpi spi : spis) {
+                    for (CommunicationSpi<?> spi : spis) {
                         ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
                         assertEquals(1, clients.size());
 
-                        final GridNioServer srv = U.field(spi, "nioSrvr");
+                        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
                         final int conns = pairedConnections ? 2 : 1;
 
                         GridTestUtils.waitForCondition(new GridAbsPredicate() {
                             @Override public boolean apply() {
-                                Collection sessions = U.field(srv, "sessions");
+                                Collection<?> sessions = U.field(srv, "sessions");
 
                                 return sessions.size() == conns * connectionsPerNode;
                             }
                         }, 5000);
 
-                        Collection sessions = U.field(srv, "sessions");
+                        Collection<?> sessions = U.field(srv, "sessions");
 
                         assertEquals(conns * connectionsPerNode, sessions.size());
                     }
@@ -396,7 +391,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /**
      * @return SPI.
      */
-    private CommunicationSpi createSpi() {
+    private CommunicationSpi<Message> createSpi() {
         TcpCommunicationSpi spi = new TcpCommunicationSpi();
 
         spi.setLocalAddress("127.0.0.1");
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index a53b43b..f99df2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -38,14 +38,13 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -100,11 +99,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     private static boolean reject;
 
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 408eb10..d99f48f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -27,14 +27,13 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -56,7 +55,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
@@ -76,11 +75,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
@@ -173,7 +168,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
                 final long totAcked0 = totAcked;
 
                 for (TcpCommunicationSpi spi : spis) {
-                    GridNioServer srv = U.field(spi, "nioSrvr");
+                    GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
                     final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 
@@ -281,7 +276,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
         ClusterNode node0 = nodes.get(0);
         ClusterNode node1 = nodes.get(1);
 
-        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+        final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
 
         int msgId = 0;
 
@@ -341,7 +336,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
-        final GridNioServer srv = U.field(spi, "nioSrvr");
+        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5ec734a..1d03590 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,13 +32,12 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -61,7 +60,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
 
@@ -90,11 +89,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
@@ -674,7 +669,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi, boolean in) throws Exception {
-        final GridNioServer srv = U.field(spi, "nioSrvr");
+        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index d937bb0..ef9b413 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -59,7 +58,7 @@ import org.junit.Test;
  *
  */
 @GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi<Message>>
     extends GridSpiAbstractTest<T> {
     /** */
     private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -80,11 +79,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      *
      */
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
@@ -95,7 +90,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     }
 
     /** */
-    private class TestListener implements CommunicationListener<Message> {
+    private static class TestListener implements CommunicationListener<Message> {
         /** */
         private GridConcurrentHashSet<Long> msgIds = new GridConcurrentHashSet<>();
 
@@ -194,7 +189,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                 final long totAcked0 = totAcked;
 
                 for (TcpCommunicationSpi spi : spis) {
-                    GridNioServer srv = U.field(spi, "nioSrvr");
+                    GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
                     Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 
@@ -306,7 +301,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
         // Check that session will not be closed by idle timeout because expected close by queue overflow.
         assertTrue(spi0.getIdleConnectionTimeout() > awaitTime);
 
-        final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+        final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
 
         // For prevent session close by write timeout.
         srv1.writeTimeout(60_000);
@@ -392,7 +387,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
      * @throws Exception If failed.
      */
     private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
-        final GridNioServer srv = U.field(spi, "nioSrvr");
+        final GridNioServer<?> srv = U.field(spi, "nioSrvr");
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 99840c8..5cb8999 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -32,12 +32,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
 import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -63,11 +62,7 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
     private final CountDownLatch latch = new CountDownLatch(1);
 
     static {
-        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
-            @Override public Message apply() {
-                return new GridTestMessage();
-            }
-        });
+        IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
     }
 
     /**
@@ -121,10 +116,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
         ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
             SynchronizedCommunicationSpi.class.getSimpleName());
 
-        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
 
-        if (mbeanServer.isRegistered(mbeanName))
-            return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class,
+        if (mbeanSrv.isRegistered(mbeanName))
+            return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TcpCommunicationSpiMBean.class,
                 true);
         else
             fail("MBean is not registered: " + mbeanName.getCanonicalName());
@@ -159,10 +154,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
 
             latch.await(10, TimeUnit.SECONDS);
 
-            ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+            ClusterGroup clusterGrpNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
 
             // Send job from node0 to node1.
-            grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
+            grid(0).compute(clusterGrpNode1).call(new IgniteCallable<Boolean>() {
                 @Override public Boolean call() throws Exception {
                     return Boolean.TRUE;
                 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index da43ded..6424bfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -567,7 +568,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /** {@inheritDoc} */
     @Override public MessageFactory messageFactory() {
         if (factory == null)
-            factory = new GridIoMessageFactory(null);
+            factory = new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
 
         return factory;
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1f41a08..84aec2e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -45,7 +45,9 @@ import org.apache.ignite.internal.managers.communication.IgniteCommunicationBala
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationSslBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImplTest;
 import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
+import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConflictTest;
 import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
@@ -330,6 +332,9 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationBalanceMultipleConnectionsTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationSslBalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, IgniteMessageFactoryImplTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, MessageDirectTypeIdConflictTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index ce04839..0720b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -21,7 +21,9 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -122,7 +124,8 @@ public class GridMessageCollectionTest {
 
         assertEquals(m.directType(), type);
 
-        GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
+        IgniteMessageFactory msgFactory =
+                new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
 
         Message mx = msgFactory.create(type);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 0ff53f7..e4aae7d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,107 +23,52 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * H2 Value message factory.
  */
-public class GridH2ValueMessageFactory implements MessageFactory {
+public class GridH2ValueMessageFactory implements MessageFactoryProvider {
     /** {@inheritDoc} */
-    @Nullable @Override public Message create(short type) {
-        switch (type) {
-            case -4:
-                return GridH2Null.INSTANCE;
-
-            case -5:
-                return new GridH2Boolean();
-
-            case -6:
-                return new GridH2Byte();
-
-            case -7:
-                return new GridH2Short();
-
-            case -8:
-                return new GridH2Integer();
-
-            case -9:
-                return new GridH2Long();
-
-            case -10:
-                return new GridH2Decimal();
-
-            case -11:
-                return new GridH2Double();
-
-            case -12:
-                return new GridH2Float();
-
-            case -13:
-                return new GridH2Time();
-
-            case -14:
-                return new GridH2Date();
-
-            case -15:
-                return new GridH2Timestamp();
-
-            case -16:
-                return new GridH2Bytes();
-
-            case -17:
-                return new GridH2String();
-
-            case -18:
-                return new GridH2Array();
-
-            case -19:
-                return new GridH2JavaObject();
-
-            case -20:
-                return new GridH2Uuid();
-
-            case -21:
-                return new GridH2Geometry();
-
-            case -22:
-                return new GridH2CacheObject();
-
-            case -30:
-                return new GridH2IndexRangeRequest();
-
-            case -31:
-                return new GridH2IndexRangeResponse();
-
-            case -32:
-                return new GridH2RowMessage();
-
-            case -33:
-                return new GridH2QueryRequest();
-
-            case -34:
-                return new GridH2RowRange();
-
-            case -35:
-                return new GridH2RowRangeBounds();
-
-            case -54:
-                return new QueryTable();
-
-            case -55:
-                return new GridH2DmlRequest();
-
-            case -56:
-                return new GridH2DmlResponse();
-
-            case -57:
-                return new GridH2SelectForUpdateTxDetails();
-        }
+    @Override public void registerAll(IgniteMessageFactory factory) {
+        factory.register((short)-4, () -> GridH2Null.INSTANCE);
+        factory.register((short)-5, GridH2Boolean::new);
+        factory.register((short)-6, GridH2Byte::new);
+        factory.register((short)-7, GridH2Short::new);
+        factory.register((short)-8, GridH2Integer::new);
+        factory.register((short)-9, GridH2Long::new);
+        factory.register((short)-10, GridH2Decimal::new);
+        factory.register((short)-11, GridH2Double::new);
+        factory.register((short)-12, GridH2Float::new);
+        factory.register((short)-13, GridH2Time::new);
+        factory.register((short)-14, GridH2Date::new);
+        factory.register((short)-15, GridH2Timestamp::new);
+        factory.register((short)-16, GridH2Bytes::new);
+        factory.register((short)-17, GridH2String::new);
+        factory.register((short)-18, GridH2Array::new);
+        factory.register((short)-19, GridH2JavaObject::new);
+        factory.register((short)-20, GridH2Uuid::new);
+        factory.register((short)-21, GridH2Geometry::new);
+        factory.register((short)-22, GridH2CacheObject::new);
+        factory.register((short)-30, GridH2IndexRangeRequest::new);
+        factory.register((short)-31, GridH2IndexRangeResponse::new);
+        factory.register((short)-32, GridH2RowMessage::new);
+        factory.register((short)-33, GridH2QueryRequest::new);
+        factory.register((short)-34, GridH2RowRange::new);
+        factory.register((short)-35, GridH2RowRangeBounds::new);
+        factory.register((short)-54, QueryTable::new);
+        factory.register((short)-55, GridH2DmlRequest::new);
+        factory.register((short)-56, GridH2DmlResponse::new);
+        factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
+    }
 
-        return null;
+    /** {@inheritDoc} */
+    @Override @Nullable public Message create(short type) {
+        throw new UnsupportedOperationException();
     }
 
     /**