You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/02/04 15:15:59 UTC
[ignite] branch master updated: IGNITE-12568 MessageFactory is
refactored in order to detect registration of message with the same direct
type
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 65c30ec IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type
65c30ec is described below
commit 65c30ec6947d35f18b620a7fc9fd0f6d5774317c
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Mon Jan 27 17:49:32 2020 +0300
IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type
---
.../managers/communication/GridIoManager.java | 4 +-
.../communication/GridIoMessageFactory.java | 1203 ++++----------------
.../communication/IgniteMessageFactoryImpl.java | 166 +++
.../communication/IgniteMessageFactory.java | 39 +
.../extensions/communication/MessageFactory.java | 3 +
...ageFactory.java => MessageFactoryProvider.java} | 31 +-
.../tcp/TcpCommunicationMetricsListener.java | 16 +-
.../org.apache.ignite.plugin.PluginProvider | 1 +
.../GridManagerLocalMessageListenerSelfTest.java | 14 +-
.../GridCommunicationSendMessageSelfTest.java | 19 +-
.../IgniteMessageFactoryImplTest.java | 199 ++++
.../MessageDirectTypeIdConflictTest.java | 209 ++++
.../GridCacheConditionalDeploymentSelfTest.java | 22 +-
...niteCacheContinuousQueryImmutableEntryTest.java | 8 +-
.../GridAbstractCommunicationSelfTest.java | 17 +-
.../communication/GridCacheMessageSelfTest.java | 61 +-
.../tcp/GridTcpCommunicationSpiAbstractTest.java | 16 +-
...pCommunicationSpiConcurrentConnectSelfTest.java | 21 +-
...idTcpCommunicationSpiMultithreadedSelfTest.java | 9 +-
...GridTcpCommunicationSpiRecoveryAckSelfTest.java | 17 +-
.../GridTcpCommunicationSpiRecoverySelfTest.java | 13 +-
...TcpCommunicationRecoveryAckClosureSelfTest.java | 19 +-
.../tcp/TcpCommunicationStatisticsTest.java | 19 +-
.../ignite/testframework/GridSpiTestContext.java | 3 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 5 +
.../ignite/util/GridMessageCollectionTest.java | 5 +-
.../h2/twostep/msg/GridH2ValueMessageFactory.java | 129 +--
27 files changed, 986 insertions(+), 1282 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9821edf..e8aeb53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -498,6 +498,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
List<MessageFactory> compMsgs = new ArrayList<>();
+ compMsgs.add(new GridIoMessageFactory());
+
for (IgniteComponentType compType : IgniteComponentType.values()) {
MessageFactory f = compType.messageFactory();
@@ -508,7 +510,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (!compMsgs.isEmpty())
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
- msgFactory = new GridIoMessageFactory(msgs);
+ msgFactory = new IgniteMessageFactoryImpl(msgs);
if (log.isDebugEnabled())
log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e82bb26..0d7976e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
@@ -196,9 +193,9 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridMessageCollection;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
@@ -210,1005 +207,209 @@ import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMess
/**
* Message factory implementation.
*/
-public class GridIoMessageFactory implements MessageFactory {
- /** Custom messages registry. Used for test purposes. */
- private static final Map<Short, IgniteOutClosure<Message>> CUSTOM = new ConcurrentHashMap<>();
-
- /** Extensions. */
- private final MessageFactory[] ext;
-
- /**
- * @param ext Extensions.
- */
- public GridIoMessageFactory(MessageFactory[] ext) {
- this.ext = ext;
+public class GridIoMessageFactory implements MessageFactoryProvider {
+ /** {@inheritDoc} */
+ @Override public void registerAll(IgniteMessageFactory factory) {
+ // -54 is reserved for SQL.
+ // -46 ... -51 - snapshot messages.
+ factory.register((short)-61, IgniteDiagnosticMessage::new);
+ factory.register((short)-53, SchemaOperationStatusMessage::new);
+ factory.register((short)-52, GridIntList::new);
+ factory.register((short)-51, NearCacheUpdates::new);
+ factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
+ factory.register((short)-49, UpdateErrors::new);
+ factory.register((short)-48, GridDhtAtomicNearResponse::new);
+ factory.register((short)-45, GridChangeGlobalStateMessageResponse::new);
+ factory.register((short)-44, HandshakeMessage2::new);
+ factory.register((short)-43, IgniteIoTestMessage::new);
+ factory.register((short)-42, HadoopDirectShuffleMessage::new);
+ factory.register((short)-41, HadoopShuffleFinishResponse::new);
+ factory.register((short)-40, HadoopShuffleFinishRequest::new);
+ factory.register((short)-39, HadoopJobId::new);
+ factory.register((short)-38, HadoopShuffleAck::new);
+ factory.register((short)-37, HadoopShuffleMessage::new);
+ factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
+ factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new);
+ factory.register((short)-26, TxLockList::new);
+ factory.register((short)-25, TxLock::new);
+ factory.register((short)-24, TxLocksRequest::new);
+ factory.register((short)-23, TxLocksResponse::new);
+ factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, NodeIdMessage::new);
+ factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, RecoveryLastReceivedMessage::new);
+ factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new);
+ factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new);
+ factory.register((short)0, GridJobCancelRequest::new);
+ factory.register((short)1, GridJobExecuteRequest::new);
+ factory.register((short)2, GridJobExecuteResponse::new);
+ factory.register((short)3, GridJobSiblingsRequest::new);
+ factory.register((short)4, GridJobSiblingsResponse::new);
+ factory.register((short)5, GridTaskCancelRequest::new);
+ factory.register((short)6, GridTaskSessionRequest::new);
+ factory.register((short)7, GridCheckpointRequest::new);
+ factory.register((short)8, GridIoMessage::new);
+ factory.register((short)9, GridIoUserMessage::new);
+ factory.register((short)10, GridDeploymentInfoBean::new);
+ factory.register((short)11, GridDeploymentRequest::new);
+ factory.register((short)12, GridDeploymentResponse::new);
+ factory.register((short)13, GridEventStorageMessage::new);
+ factory.register((short)16, GridCacheTxRecoveryRequest::new);
+ factory.register((short)17, GridCacheTxRecoveryResponse::new);
+ factory.register((short)20, GridCacheTtlUpdateRequest::new);
+ factory.register((short)21, GridDistributedLockRequest::new);
+ factory.register((short)22, GridDistributedLockResponse::new);
+ factory.register((short)23, GridDistributedTxFinishRequest::new);
+ factory.register((short)24, GridDistributedTxFinishResponse::new);
+ factory.register((short)25, GridDistributedTxPrepareRequest::new);
+ factory.register((short)26, GridDistributedTxPrepareResponse::new);
+ factory.register((short)27, GridDistributedUnlockRequest::new);
+ factory.register((short)28, GridDhtAffinityAssignmentRequest::new);
+ factory.register((short)29, GridDhtAffinityAssignmentResponse::new);
+ factory.register((short)30, GridDhtLockRequest::new);
+ factory.register((short)31, GridDhtLockResponse::new);
+ factory.register((short)32, GridDhtTxFinishRequest::new);
+ factory.register((short)33, GridDhtTxFinishResponse::new);
+ factory.register((short)34, GridDhtTxPrepareRequest::new);
+ factory.register((short)35, GridDhtTxPrepareResponse::new);
+ factory.register((short)36, GridDhtUnlockRequest::new);
+ factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
+ factory.register((short)38, GridDhtAtomicUpdateRequest::new);
+ factory.register((short)39, GridDhtAtomicUpdateResponse::new);
+ factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
+ factory.register((short)41, GridNearAtomicUpdateResponse::new);
+ factory.register((short)42, GridDhtForceKeysRequest::new);
+ factory.register((short)43, GridDhtForceKeysResponse::new);
+ factory.register((short)44, GridDhtPartitionDemandLegacyMessage::new);
+ factory.register((short)45, GridDhtPartitionDemandMessage::new);
+ factory.register((short)46, GridDhtPartitionsFullMessage::new);
+ factory.register((short)47, GridDhtPartitionsSingleMessage::new);
+ factory.register((short)48, GridDhtPartitionsSingleRequest::new);
+ factory.register((short)49, GridNearGetRequest::new);
+ factory.register((short)50, GridNearGetResponse::new);
+ factory.register((short)51, GridNearLockRequest::new);
+ factory.register((short)52, GridNearLockResponse::new);
+ factory.register((short)53, GridNearTxFinishRequest::new);
+ factory.register((short)54, GridNearTxFinishResponse::new);
+ factory.register((short)55, GridNearTxPrepareRequest::new);
+ factory.register((short)56, GridNearTxPrepareResponse::new);
+ factory.register((short)57, GridNearUnlockRequest::new);
+ factory.register((short)58, GridCacheQueryRequest::new);
+ factory.register((short)59, GridCacheQueryResponse::new);
+ factory.register((short)61, GridContinuousMessage::new);
+ factory.register((short)62, DataStreamerRequest::new);
+ factory.register((short)63, DataStreamerResponse::new);
+ factory.register((short)64, IgfsAckMessage::new);
+ factory.register((short)65, IgfsBlockKey::new);
+ factory.register((short)66, IgfsBlocksMessage::new);
+ factory.register((short)67, IgfsDeleteMessage::new);
+ factory.register((short)68, IgfsFileAffinityRange::new);
+ factory.register((short)69, IgfsFragmentizerRequest::new);
+ factory.register((short)70, IgfsFragmentizerResponse::new);
+ factory.register((short)71, IgfsSyncMessage::new);
+ factory.register((short)76, GridTaskResultRequest::new);
+ factory.register((short)77, GridTaskResultResponse::new);
+ factory.register((short)78, MissingMappingRequestMessage::new);
+ factory.register((short)79, MissingMappingResponseMessage::new);
+ factory.register((short)80, MetadataRequestMessage::new);
+ factory.register((short)81, MetadataResponseMessage::new);
+ factory.register((short)82, JobStealingRequest::new);
+ factory.register((short)84, GridByteArrayList::new);
+ factory.register((short)85, GridLongList::new);
+ factory.register((short)86, GridCacheVersion::new);
+ factory.register((short)87, GridDhtPartitionExchangeId::new);
+ factory.register((short)88, GridCacheReturn::new);
+ factory.register((short)89, CacheObjectImpl::new);
+ factory.register((short)90, KeyCacheObjectImpl::new);
+ factory.register((short)91, GridCacheEntryInfo::new);
+ factory.register((short)92, CacheEntryInfoCollection::new);
+ factory.register((short)93, CacheInvokeDirectResult::new);
+ factory.register((short)94, IgniteTxKey::new);
+ factory.register((short)95, DataStreamerEntry::new);
+ factory.register((short)96, CacheContinuousQueryEntry::new);
+ factory.register((short)97, CacheEvictionEntry::new);
+ factory.register((short)98, CacheEntryPredicateContainsValue::new);
+ factory.register((short)99, CacheEntrySerializablePredicate::new);
+ factory.register((short)100, IgniteTxEntry::new);
+ factory.register((short)101, TxEntryValueHolder::new);
+ factory.register((short)102, CacheVersionedValue::new);
+ factory.register((short)103, GridCacheRawVersionedEntry::new);
+ factory.register((short)104, GridCacheVersionEx::new);
+ factory.register((short)105, CacheObjectByteArrayImpl::new);
+ factory.register((short)106, GridQueryCancelRequest::new);
+ factory.register((short)107, GridQueryFailResponse::new);
+ factory.register((short)108, GridQueryNextPageRequest::new);
+ factory.register((short)109, GridQueryNextPageResponse::new);
+ factory.register((short)111, AffinityTopologyVersion::new);
+ factory.register((short)112, GridCacheSqlQuery::new);
+ factory.register((short)113, BinaryObjectImpl::new);
+ factory.register((short)114, GridDhtPartitionSupplyMessage::new);
+ factory.register((short)115, UUIDCollectionMessage::new);
+ factory.register((short)116, GridNearSingleGetRequest::new);
+ factory.register((short)117, GridNearSingleGetResponse::new);
+ factory.register((short)118, CacheContinuousQueryBatchAck::new);
+ factory.register((short)119, BinaryEnumObjectImpl::new);
+
+ // [120..123] - DR
+ factory.register((short)124, GridMessageCollection::new);
+ factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
+ factory.register((short)126, GridNearAtomicSingleUpdateInvokeRequest::new);
+ factory.register((short)127, GridNearAtomicSingleUpdateFilterRequest::new);
+ factory.register((short)128, CacheGroupAffinityMessage::new);
+ factory.register((short)129, WalStateAckMessage::new);
+ factory.register((short)130, UserManagementOperationFinishedMessage::new);
+ factory.register((short)131, UserAuthenticateRequestMessage::new);
+ factory.register((short)132, UserAuthenticateResponseMessage::new);
+ factory.register((short)133, ClusterMetricsUpdateMessage::new);
+ factory.register((short)134, ContinuousRoutineStartResultMessage::new);
+ factory.register((short)135, LatchAckMessage::new);
+ factory.register((short)136, MvccTxSnapshotRequest::new);
+ factory.register((short)137, MvccAckRequestTx::new);
+ factory.register((short)138, MvccFutureResponse::new);
+ factory.register((short)139, MvccQuerySnapshotRequest::new);
+ factory.register((short)140, MvccAckRequestQueryCntr::new);
+ factory.register((short)141, MvccSnapshotResponse::new);
+ factory.register((short)143, GridCacheMvccEntryInfo::new);
+ factory.register((short)144, GridDhtTxQueryEnlistResponse::new);
+ factory.register((short)145, MvccAckRequestQueryId::new);
+ factory.register((short)146, MvccAckRequestTxAndQueryCntr::new);
+ factory.register((short)147, MvccAckRequestTxAndQueryId::new);
+ factory.register((short)148, MvccVersionImpl::new);
+ factory.register((short)149, MvccActiveQueriesMessage::new);
+ factory.register((short)150, MvccSnapshotWithoutTxs::new);
+ factory.register((short)151, GridNearTxQueryEnlistRequest::new);
+ factory.register((short)152, GridNearTxQueryEnlistResponse::new);
+ factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new);
+ factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new);
+ factory.register((short)155, GridDhtTxQueryEnlistRequest::new);
+ factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new);
+ factory.register((short)157, PartitionUpdateCountersMessage::new);
+ factory.register((short)158, GridDhtPartitionSupplyMessageV2::new);
+ factory.register((short)159, GridNearTxEnlistRequest::new);
+ factory.register((short)160, GridNearTxEnlistResponse::new);
+ factory.register((short)161, GridInvokeValue::new);
+ factory.register((short)162, GenerateEncryptionKeyRequest::new);
+ factory.register((short)163, GenerateEncryptionKeyResponse::new);
+ factory.register((short)164, MvccRecoveryFinishedMessage::new);
+ factory.register((short)165, PartitionCountersNeighborcastRequest::new);
+ factory.register((short)166, PartitionCountersNeighborcastResponse::new);
+ factory.register((short)167, ServiceDeploymentProcessId::new);
+ factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new);
+ factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
+ factory.register((short)170, DeadlockProbe::new);
+ factory.register((short)171, ProbedTx::new);
+ factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new);
+ factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new);
+ factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new);
+ factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
+ factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
+
+ // [-3..119] [124..129] [-23..-28] [-36..-55] - this
+ // [120..123] - DR
+ // [-4..-22, -30..-35] - SQL
+ // [2048..2053] - Snapshots
}
/** {@inheritDoc} */
@Override public Message create(short type) {
- Message msg = null;
-
- switch (type) {
- // -54 is reserved for SQL.
- // -46 ... -51 - snapshot messages.
- case -61:
- msg = new IgniteDiagnosticMessage();
-
- break;
-
- case -53:
- msg = new SchemaOperationStatusMessage();
-
- break;
-
- case -52:
- msg = new GridIntList();
-
- break;
-
- case -51:
- msg = new NearCacheUpdates();
-
- break;
-
- case -50:
- msg = new GridNearAtomicCheckUpdateRequest();
-
- break;
-
- case -49:
- msg = new UpdateErrors();
-
- break;
-
- case -48:
- msg = new GridDhtAtomicNearResponse();
-
- break;
-
- case -45:
- msg = new GridChangeGlobalStateMessageResponse();
-
- break;
-
- case -44:
- msg = new HandshakeMessage2();
-
- break;
-
- case -43:
- msg = new IgniteIoTestMessage();
-
- break;
-
- case -42:
- msg = new HadoopDirectShuffleMessage();
-
- break;
-
- case -41:
- msg = new HadoopShuffleFinishResponse();
-
- break;
-
- case -40:
- msg = new HadoopShuffleFinishRequest();
-
- break;
-
- case -39:
- msg = new HadoopJobId();
-
- break;
-
- case -38:
- msg = new HadoopShuffleAck();
-
- break;
-
- case -37:
- msg = new HadoopShuffleMessage();
-
- break;
-
- case -36:
- msg = new GridDhtAtomicSingleUpdateRequest();
-
- break;
-
- case -27:
- msg = new GridDhtTxOnePhaseCommitAckRequest();
-
- break;
-
- case -26:
- msg = new TxLockList();
-
- break;
-
- case -25:
- msg = new TxLock();
-
- break;
-
- case -24:
- msg = new TxLocksRequest();
-
- break;
-
- case -23:
- msg = new TxLocksResponse();
-
- break;
-
- case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
- msg = new NodeIdMessage();
-
- break;
-
- case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
- msg = new RecoveryLastReceivedMessage();
-
- break;
-
- case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
- msg = new HandshakeMessage();
-
- break;
-
- case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE:
- msg = new HandshakeWaitMessage();
-
- break;
-
- case 0:
- msg = new GridJobCancelRequest();
-
- break;
-
- case 1:
- msg = new GridJobExecuteRequest();
-
- break;
-
- case 2:
- msg = new GridJobExecuteResponse();
-
- break;
-
- case 3:
- msg = new GridJobSiblingsRequest();
-
- break;
-
- case 4:
- msg = new GridJobSiblingsResponse();
-
- break;
-
- case 5:
- msg = new GridTaskCancelRequest();
-
- break;
-
- case 6:
- msg = new GridTaskSessionRequest();
-
- break;
-
- case 7:
- msg = new GridCheckpointRequest();
-
- break;
-
- case 8:
- msg = new GridIoMessage();
-
- break;
-
- case 9:
- msg = new GridIoUserMessage();
-
- break;
-
- case 10:
- msg = new GridDeploymentInfoBean();
-
- break;
-
- case 11:
- msg = new GridDeploymentRequest();
-
- break;
-
- case 12:
- msg = new GridDeploymentResponse();
-
- break;
-
- case 13:
- msg = new GridEventStorageMessage();
-
- break;
-
- case 16:
- msg = new GridCacheTxRecoveryRequest();
-
- break;
-
- case 17:
- msg = new GridCacheTxRecoveryResponse();
-
- break;
-
- case 20:
- msg = new GridCacheTtlUpdateRequest();
-
- break;
-
- case 21:
- msg = new GridDistributedLockRequest();
-
- break;
-
- case 22:
- msg = new GridDistributedLockResponse();
-
- break;
-
- case 23:
- msg = new GridDistributedTxFinishRequest();
-
- break;
-
- case 24:
- msg = new GridDistributedTxFinishResponse();
-
- break;
-
- case 25:
- msg = new GridDistributedTxPrepareRequest();
-
- break;
-
- case 26:
- msg = new GridDistributedTxPrepareResponse();
-
- break;
-
- case 27:
- msg = new GridDistributedUnlockRequest();
-
- break;
-
- case 28:
- msg = new GridDhtAffinityAssignmentRequest();
-
- break;
-
- case 29:
- msg = new GridDhtAffinityAssignmentResponse();
-
- break;
-
- case 30:
- msg = new GridDhtLockRequest();
-
- break;
-
- case 31:
- msg = new GridDhtLockResponse();
-
- break;
-
- case 32:
- msg = new GridDhtTxFinishRequest();
-
- break;
-
- case 33:
- msg = new GridDhtTxFinishResponse();
-
- break;
-
- case 34:
- msg = new GridDhtTxPrepareRequest();
-
- break;
-
- case 35:
- msg = new GridDhtTxPrepareResponse();
-
- break;
-
- case 36:
- msg = new GridDhtUnlockRequest();
-
- break;
-
- case 37:
- msg = new GridDhtAtomicDeferredUpdateResponse();
-
- break;
-
- case 38:
- msg = new GridDhtAtomicUpdateRequest();
-
- break;
-
- case 39:
- msg = new GridDhtAtomicUpdateResponse();
-
- break;
-
- case 40:
- msg = new GridNearAtomicFullUpdateRequest();
-
- break;
-
- case 41:
- msg = new GridNearAtomicUpdateResponse();
-
- break;
-
- case 42:
- msg = new GridDhtForceKeysRequest();
-
- break;
-
- case 43:
- msg = new GridDhtForceKeysResponse();
-
- break;
-
- case 44:
- msg = new GridDhtPartitionDemandLegacyMessage();
-
- break;
-
- case 45:
- msg = new GridDhtPartitionDemandMessage();
-
- break;
-
- case 46:
- msg = new GridDhtPartitionsFullMessage();
-
- break;
-
- case 47:
- msg = new GridDhtPartitionsSingleMessage();
-
- break;
-
- case 48:
- msg = new GridDhtPartitionsSingleRequest();
-
- break;
-
- case 49:
- msg = new GridNearGetRequest();
-
- break;
-
- case 50:
- msg = new GridNearGetResponse();
-
- break;
-
- case 51:
- msg = new GridNearLockRequest();
-
- break;
-
- case 52:
- msg = new GridNearLockResponse();
-
- break;
-
- case 53:
- msg = new GridNearTxFinishRequest();
-
- break;
-
- case 54:
- msg = new GridNearTxFinishResponse();
-
- break;
-
- case 55:
- msg = new GridNearTxPrepareRequest();
-
- break;
-
- case 56:
- msg = new GridNearTxPrepareResponse();
-
- break;
-
- case 57:
- msg = new GridNearUnlockRequest();
-
- break;
-
- case 58:
- msg = new GridCacheQueryRequest();
-
- break;
-
- case 59:
- msg = new GridCacheQueryResponse();
-
- break;
-
- case 61:
- msg = new GridContinuousMessage();
-
- break;
-
- case 62:
- msg = new DataStreamerRequest();
-
- break;
-
- case 63:
- msg = new DataStreamerResponse();
-
- break;
-
- case 64:
- msg = new IgfsAckMessage();
-
- break;
-
- case 65:
- msg = new IgfsBlockKey();
-
- break;
-
- case 66:
- msg = new IgfsBlocksMessage();
-
- break;
-
- case 67:
- msg = new IgfsDeleteMessage();
-
- break;
-
- case 68:
- msg = new IgfsFileAffinityRange();
-
- break;
-
- case 69:
- msg = new IgfsFragmentizerRequest();
-
- break;
-
- case 70:
- msg = new IgfsFragmentizerResponse();
-
- break;
-
- case 71:
- msg = new IgfsSyncMessage();
-
- break;
-
- case 76:
- msg = new GridTaskResultRequest();
-
- break;
-
- case 77:
- msg = new GridTaskResultResponse();
-
- break;
-
- case 78:
- msg = new MissingMappingRequestMessage();
-
- break;
-
- case 79:
- msg = new MissingMappingResponseMessage();
-
- break;
-
- case 80:
- msg = new MetadataRequestMessage();
-
- break;
-
- case 81:
- msg = new MetadataResponseMessage();
-
- break;
-
- case 82:
- msg = new JobStealingRequest();
-
- break;
-
- case 84:
- msg = new GridByteArrayList();
-
- break;
-
- case 85:
- msg = new GridLongList();
-
- break;
-
- case 86:
- msg = new GridCacheVersion();
-
- break;
-
- case 87:
- msg = new GridDhtPartitionExchangeId();
-
- break;
-
- case 88:
- msg = new GridCacheReturn();
-
- break;
-
- case 89:
- msg = new CacheObjectImpl();
-
- break;
-
- case 90:
- msg = new KeyCacheObjectImpl();
-
- break;
-
- case 91:
- msg = new GridCacheEntryInfo();
-
- break;
-
- case 92:
- msg = new CacheEntryInfoCollection();
-
- break;
-
- case 93:
- msg = new CacheInvokeDirectResult();
-
- break;
-
- case 94:
- msg = new IgniteTxKey();
-
- break;
-
- case 95:
- msg = new DataStreamerEntry();
-
- break;
-
- case 96:
- msg = new CacheContinuousQueryEntry();
-
- break;
-
- case 97:
- msg = new CacheEvictionEntry();
-
- break;
-
- case 98:
- msg = new CacheEntryPredicateContainsValue();
-
- break;
-
- case 99:
- msg = new CacheEntrySerializablePredicate();
-
- break;
-
- case 100:
- msg = new IgniteTxEntry();
-
- break;
-
- case 101:
- msg = new TxEntryValueHolder();
-
- break;
-
- case 102:
- msg = new CacheVersionedValue();
-
- break;
-
- case 103:
- msg = new GridCacheRawVersionedEntry<>();
-
- break;
-
- case 104:
- msg = new GridCacheVersionEx();
-
- break;
-
- case 105:
- msg = new CacheObjectByteArrayImpl();
-
- break;
-
- case 106:
- msg = new GridQueryCancelRequest();
-
- break;
-
- case 107:
- msg = new GridQueryFailResponse();
-
- break;
-
- case 108:
- msg = new GridQueryNextPageRequest();
-
- break;
-
- case 109:
- msg = new GridQueryNextPageResponse();
-
- break;
-
- case 110:
- // EMPTY type
- // GridQueryRequest was removed
- break;
-
- case 111:
- msg = new AffinityTopologyVersion();
-
- break;
-
- case 112:
- msg = new GridCacheSqlQuery();
-
- break;
-
- case 113:
- msg = new BinaryObjectImpl();
-
- break;
-
- case 114:
- msg = new GridDhtPartitionSupplyMessage();
-
- break;
-
- case 115:
- msg = new UUIDCollectionMessage();
-
- break;
-
- case 116:
- msg = new GridNearSingleGetRequest();
-
- break;
-
- case 117:
- msg = new GridNearSingleGetResponse();
-
- break;
-
- case 118:
- msg = new CacheContinuousQueryBatchAck();
-
- break;
-
- case 119:
- msg = new BinaryEnumObjectImpl();
-
- break;
-
- // [120..123] - DR
- case 124:
- msg = new GridMessageCollection<>();
-
- break;
-
- case 125:
- msg = new GridNearAtomicSingleUpdateRequest();
-
- break;
-
- case 126:
- msg = new GridNearAtomicSingleUpdateInvokeRequest();
-
- break;
-
- case 127:
- msg = new GridNearAtomicSingleUpdateFilterRequest();
-
- break;
-
- case 128:
- msg = new CacheGroupAffinityMessage();
-
- break;
-
- case 129:
- msg = new WalStateAckMessage();
-
- break;
-
- case 130:
- msg = new UserManagementOperationFinishedMessage();
-
- break;
-
- case 131:
- msg = new UserAuthenticateRequestMessage();
-
- break;
-
- case 132:
- msg = new UserAuthenticateResponseMessage();
-
- break;
-
- case 133:
- msg = new ClusterMetricsUpdateMessage();
-
- break;
-
- case 134:
- msg = new ContinuousRoutineStartResultMessage();
-
- break;
-
- case 135:
- msg = new LatchAckMessage();
-
- break;
-
- case 136:
- msg = new MvccTxSnapshotRequest();
-
- break;
-
- case 137:
- msg = new MvccAckRequestTx();
-
- break;
-
- case 138:
- msg = new MvccFutureResponse();
-
- break;
-
- case 139:
- msg = new MvccQuerySnapshotRequest();
-
- break;
-
- case 140:
- msg = new MvccAckRequestQueryCntr();
-
- break;
-
- case 141:
- msg = new MvccSnapshotResponse();
-
- break;
-
- case 143:
- msg = new GridCacheMvccEntryInfo();
-
- break;
-
- case 144:
- msg = new GridDhtTxQueryEnlistResponse();
-
- break;
-
- case 145:
- msg = new MvccAckRequestQueryId();
-
- break;
-
- case 146:
- msg = new MvccAckRequestTxAndQueryCntr();
-
- break;
-
- case 147:
- msg = new MvccAckRequestTxAndQueryId();
-
- break;
-
- case 148:
- msg = new MvccVersionImpl();
-
- break;
-
- case 149:
- msg = new MvccActiveQueriesMessage();
-
- break;
-
- case 150:
- msg = new MvccSnapshotWithoutTxs();
-
- break;
-
- case 151:
- msg = new GridNearTxQueryEnlistRequest();
-
- break;
-
- case 152:
- msg = new GridNearTxQueryEnlistResponse();
-
- break;
-
- case 153:
- msg = new GridNearTxQueryResultsEnlistRequest();
-
- break;
-
- case 154:
- msg = new GridNearTxQueryResultsEnlistResponse();
-
- break;
-
- case 155:
- msg = new GridDhtTxQueryEnlistRequest();
-
- break;
-
- case 156:
- msg = new GridDhtTxQueryFirstEnlistRequest();
-
- break;
-
- case 157:
- msg = new PartitionUpdateCountersMessage();
-
- break;
-
- case 158:
- msg = new GridDhtPartitionSupplyMessageV2();
-
- break;
-
- case 159:
- msg = new GridNearTxEnlistRequest();
-
- break;
-
- case 160:
- msg = new GridNearTxEnlistResponse();
-
- break;
-
- case 161:
- msg = new GridInvokeValue();
-
- break;
-
- case 162:
- msg = new GenerateEncryptionKeyRequest();
-
- break;
-
- case 163:
- msg = new GenerateEncryptionKeyResponse();
-
- break;
-
- case 164:
- msg = new MvccRecoveryFinishedMessage();
-
- break;
-
- case 165:
- msg = new PartitionCountersNeighborcastRequest();
-
- break;
-
- case 166:
- msg = new PartitionCountersNeighborcastResponse();
-
- break;
-
- case 167:
- msg = new ServiceDeploymentProcessId();
-
- break;
-
- case 168:
- msg = new ServiceSingleNodeDeploymentResultBatch();
-
- break;
-
- case 169:
- msg = new ServiceSingleNodeDeploymentResult();
-
- break;
-
- case 170:
- msg = new DeadlockProbe();
-
- break;
-
- case 171:
- msg = new ProbedTx();
-
- break;
-
- case GridQueryKillRequest.TYPE_CODE:
- msg = new GridQueryKillRequest();
-
- break;
-
- case GridQueryKillResponse.TYPE_CODE:
- msg = new GridQueryKillResponse();
-
- break;
-
- case GridIoSecurityAwareMessage.TYPE_CODE:
- msg = new GridIoSecurityAwareMessage();
-
- break;
-
- case SessionChannelMessage.TYPE_CODE:
- msg = new SessionChannelMessage();
-
- break;
-
- case SingleNodeMessage.TYPE_CODE:
- msg = new SingleNodeMessage<>();
-
- break;
-
- // [-3..119] [124..129] [-23..-28] [-36..-55] - this
- // [120..123] - DR
- // [-4..-22, -30..-35] - SQL
- // [2048..2053] - Snapshots
- default:
- if (ext != null) {
- for (MessageFactory factory : ext) {
- msg = factory.create(type);
-
- if (msg != null)
- break;
- }
- }
-
- if (msg == null) {
- IgniteOutClosure<Message> c = CUSTOM.get(type);
-
- if (c != null)
- msg = c.apply();
- }
- }
-
- if (msg == null)
- throw new IgniteException("Invalid message type: " + type);
-
- return msg;
- }
-
- /**
- * Registers factory for custom message. Used for test purposes.
- *
- * @param type Message type.
- * @param c Message producer.
- */
- public static void registerCustom(short type, IgniteOutClosure<Message> c) {
- assert c != null;
-
- CUSTOM.put(type, c);
+ throw new UnsupportedOperationException();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
new file mode 100644
index 0000000..eb89043
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Message factory implementation which is responsible for instantiation of all communication messages.
+ */
+public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
+ /** Offset. */
+ private static final int OFF = -Short.MIN_VALUE;
+
+ /** Array size. */
+ private static final int ARR_SIZE = 1 << Short.SIZE;
+
+ /** Custom messages registry. Used for test purposes. */
+ private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>();
+
+ /** Message suppliers. */
+ private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE);
+
+ /** Initialized flag. If {@code true} then new message type couldn't be registered. */
+ private boolean initialized;
+
+ /**
+ * Contructor.
+ *
+ * @param factories Concrete message factories or message factory providers. Cfn't be empty or {@code null}.
+ */
+ public IgniteMessageFactoryImpl(MessageFactory[] factories) {
+ if (factories == null || factories.length == 0)
+ throw new IllegalArgumentException("Message factory couldn't be initialized. Factories aren't provided.");
+
+ List<MessageFactory> old = new ArrayList<>(factories.length);
+
+ for (MessageFactory factory : factories) {
+ if (factory instanceof MessageFactoryProvider) {
+ MessageFactoryProvider p = (MessageFactoryProvider)factory;
+
+ p.registerAll(this);
+ }
+ else
+ old.add(factory);
+ }
+
+ if (!old.isEmpty()) {
+ for (int i = 0; i < ARR_SIZE; i++) {
+ Supplier<Message> curr = msgSuppliers[i];
+
+ if (curr == null) {
+ short directType = indexToDirectType(i);
+
+ for (MessageFactory factory : old) {
+ Message msg = factory.create(directType);
+
+ if (msg != null)
+ register(directType, () -> factory.create(directType));
+ }
+ }
+ }
+ }
+
+ initialized = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
+ if (initialized) {
+ throw new IllegalStateException("Message factory is already initialized. " +
+ "Registration of new message types is forbidden.");
+ }
+
+ int idx = directTypeToIndex(directType);
+
+ Supplier<Message> curr = msgSuppliers[idx];
+
+ if (curr == null)
+ msgSuppliers[idx] = supplier;
+ else
+ throw new IgniteException("Message factory is already registered for direct type: " + directType);
+ }
+
+ /**
+ * Creates new message instance of provided direct type.
+ * <p>
+ *
+ * @param directType Message direct type.
+ * @return Message instance.
+ * @throws IgniteException If there are no any message factory for given {@code directType}.
+ */
+ @Override public @Nullable Message create(short directType) {
+ Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];
+
+ if (supplier == null)
+ supplier = CUSTOM.get(directType);
+
+ if (supplier == null)
+ throw new IgniteException("Invalid message type: " + directType);
+
+ return supplier.get();
+ }
+
+ /**
+ * @param directType Direct type.
+ */
+ private static int directTypeToIndex(short directType) {
+ return directType + OFF;
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private static short indexToDirectType(int idx) {
+ int res = idx - OFF;
+
+ assert res >= Short.MIN_VALUE && res <= Short.MAX_VALUE;
+
+ return (short)res;
+ }
+
+ /**
+ * Registers factory for custom message. Used for test purposes.
+ *
+ * @param type Message type.
+ * @param c Message producer.
+ *
+ * @deprecated Should be removed. Please don't use this method anymore.
+ * Consider using of plugin with own message types.
+ */
+ @TestOnly
+ @Deprecated
+ public static void registerCustom(short type, Supplier<Message> c) {
+ assert c != null;
+
+ CUSTOM.put(type, c);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
new file mode 100644
index 0000000..ae159b3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import java.util.function.Supplier;
+import org.apache.ignite.IgniteException;
+
+/**
+ * Message factory for all communication messages registered using {@link #register(short, Supplier)} method call.
+ */
+public interface IgniteMessageFactory extends MessageFactory {
+ /**
+ * Register message factory with given direct type. All messages must be registered during construction
+ * of class which implements this interface. Any invocation of this method after initialization is done must
+ * throw {@link IllegalStateException} exception.
+ *
+ * @param directType Direct type.
+ * @param supplier Message factory.
+ * @throws IgniteException In case of attempt to register message with direct type which is already registered.
+ * @throws IllegalStateException On any invocation of this method when class which implements this interface
+ * is alredy constructed.
+ */
+ public void register(short directType, Supplier<Message> supplier) throws IgniteException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
index 1ea88fb..07a3d5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -26,7 +26,10 @@ import org.jetbrains.annotations.Nullable;
* A plugin can provide his own message factory as an extension
* if it uses any custom messages (all message must extend
* {@link Message} class).
+ *
+ * @deprecated Use {@link MessageFactoryProvider} instead.
*/
+@Deprecated
public interface MessageFactory extends Extension {
/**
* Creates new message instance of provided type.
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
copy to modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
index 1ea88fb..910c68c 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
@@ -17,25 +17,30 @@
package org.apache.ignite.plugin.extensions.communication;
-import org.apache.ignite.plugin.Extension;
import org.jetbrains.annotations.Nullable;
/**
- * Factory for communication messages.
+ * Provider of communication message factories.
* <p>
- * A plugin can provide his own message factory as an extension
- * if it uses any custom messages (all message must extend
- * {@link Message} class).
+ * Implementation of this interface is responsible for registration of all message factories in
+ * {@link #registerAll} method.
+ * <p>
+ * {@link #registerAll} method's call is responsibility of {@link IgniteMessageFactory} implementation.
*/
-public interface MessageFactory extends Extension {
+public interface MessageFactoryProvider extends MessageFactory {
/**
- * Creates new message instance of provided type.
- * <p>
- * This method should return {@code null} if provided message type
- * is unknown to this factory.
+ * Registers all messages factories. See {@link IgniteMessageFactory#register}.
*
- * @param type Message type.
- * @return Message instance.
+ * @param factory {@link IgniteMessageFactory} implementation.
+ */
+ public void registerAll(IgniteMessageFactory factory);
+
+ /**
+ * Always throws {@link UnsupportedOperationException}.
+ * @param type Message direct type.
+ * @throws UnsupportedOperationException On any invocation.
*/
- @Nullable public Message create(short type);
+ @Override @Nullable public default Message create(short type) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index d89fe2c..1be4e6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -109,7 +109,7 @@ class TcpCommunicationMetricsListener {
private final Object msgTypMapMux = new Object();
/** Message type map. */
- private volatile Map<Short, String> msgTypMap;
+ private volatile Map<Short, String> msgTypeMap;
/** */
public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
@@ -285,7 +285,7 @@ class TcpCommunicationMetricsListener {
if (metric.name().startsWith(prefix)) {
short directType = Short.parseShort(metric.name().substring(prefix.length()));
- Map<Short, String> msgTypMap0 = msgTypMap;
+ Map<Short, String> msgTypMap0 = msgTypeMap;
if (msgTypMap0 != null) {
String typeName = msgTypMap0.get(directType);
@@ -374,24 +374,24 @@ class TcpCommunicationMetricsListener {
private void updateMessageTypeMap(Message msg) {
short typeId = msg.directType();
- Map<Short, String> msgTypMap0 = msgTypMap;
+ Map<Short, String> msgTypMap0 = msgTypeMap;
if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
synchronized (msgTypMapMux) {
- if (msgTypMap == null) {
+ if (msgTypeMap == null) {
msgTypMap0 = new HashMap<>();
msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypMap = msgTypMap0;
+ msgTypeMap = msgTypMap0;
}
else {
- if (!msgTypMap.containsKey(typeId)) {
- msgTypMap0 = new HashMap<>(msgTypMap);
+ if (!msgTypeMap.containsKey(typeId)) {
+ msgTypMap0 = new HashMap<>(msgTypeMap);
msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypMap = msgTypMap0;
+ msgTypeMap = msgTypMap0;
}
}
}
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
index 6bca88f..7704c0b 100644
--- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -2,3 +2,4 @@ org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteSta
org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider
org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin
org.apache.ignite.plugin.NodeValidationPluginProvider
+
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 108e4672..6dd103e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -22,11 +22,9 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
-import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiContext;
@@ -48,11 +46,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
private static final short DIRECT_TYPE = 210;
static {
- GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridIoUserMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new);
}
/** {@inheritDoc} */
@@ -180,7 +174,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
private IgniteSpiContext spiCtx;
/** Test message topic. **/
- private String TEST_TOPIC = "test_topic";
+ private static final String TEST_TOPIC = "test_topic";
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
@@ -192,6 +186,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
// No-op.
}
+ /** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
this.spiCtx = spiCtx;
@@ -203,6 +198,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
}
+ /** {@inheritDoc} */
@Override public void onContextDestroyed0() {
spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
@Override public boolean apply(UUID uuid, Object o) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 8a27a46..0a75cf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -46,20 +45,10 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
/** */
private static final short DIRECT_TYPE_OVER_BYTE = 1000;
- /** */
- private int bufSize;
-
static {
- GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestMessage();
- }
- });
- GridIoMessageFactory.registerCustom(DIRECT_TYPE_OVER_BYTE, new CO<Message>() {
- @Override public Message apply() {
- return new TestOverByteIdMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new);
+
+ IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
}
/** {@inheritDoc} */
@@ -108,8 +97,6 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
*/
@Test
public void testSendMessageWithBuffer() throws Exception {
- bufSize = 8192;
-
try {
startGridsMultiThreaded(2);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
new file mode 100644
index 0000000..a29177f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for default implementation of {@link IgniteMessageFactory} interface.
+ */
+public class IgniteMessageFactoryImplTest {
+ /** Test message 1 type. */
+ private static final short TEST_MSG_1_TYPE = 1;
+
+ /** Test message 2 type. */
+ private static final short TEST_MSG_2_TYPE = 2;
+
+ /** Unknown message type. */
+ private static final short UNKNOWN_MSG_TYPE = 0;
+
+ /**
+ * Tests that impossible register new message after initialization.
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testReadOnly() {
+ MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+ IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+ msgFactory.register((short)0, () -> null);
+ }
+
+ /**
+ * Tests that proper message type will be returned by message factory.
+ */
+ @Test
+ public void testCreate() {
+ MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+ IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+ Message msg;
+
+ msg = msgFactory.create(TEST_MSG_1_TYPE);
+ assertTrue(msg instanceof TestMessage1);
+
+ msg = msgFactory.create(TEST_MSG_2_TYPE);
+ assertTrue(msg instanceof TestMessage2);
+
+ msg = msgFactory.create(TEST_MSG_2_TYPE);
+ assertTrue(msg instanceof TestMessage2);
+ }
+
+ /**
+ * Tests that exception will be thrown for unknown message direct type.
+ */
+ @Test(expected = IgniteException.class)
+ public void testCreate_UnknownMessageType() {
+ MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
+
+ IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
+
+ msgFactory.create(UNKNOWN_MSG_TYPE);
+ }
+
+ /**
+ * Tests attemption of registration message with already registered message type.
+ */
+ @Test(expected = IgniteException.class)
+ @SuppressWarnings("ResultOfObjectAllocationIgnored")
+ public void testRegisterTheSameType() {
+ MessageFactory[] factories = {
+ new TestMessageFactoryPovider(),
+ new TestMessageFactory(),
+ new TestMessageFactoryPoviderWithTheSameDirectType()
+ };
+
+ new IgniteMessageFactoryImpl(factories);
+ }
+
+
+ /**
+ * {@link MessageFactoryProvider} implementation.
+ */
+ private static class TestMessageFactoryPovider implements MessageFactoryProvider {
+ /** {@inheritDoc} */
+ @Override public void registerAll(IgniteMessageFactory factory) {
+ factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+ }
+ }
+
+ /**
+ * {@link MessageFactoryProvider} implementation with message direct type which is already registered.
+ */
+ private static class TestMessageFactoryPoviderWithTheSameDirectType implements MessageFactoryProvider {
+ /** {@inheritDoc} */
+ @Override public void registerAll(IgniteMessageFactory factory) {
+ factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+ }
+ }
+
+ /**
+ * {@link MessageFactory} implementation whish still uses creation with switch-case.
+ */
+ private static class TestMessageFactory implements MessageFactory {
+ /** {@inheritDoc} */
+ @Override public @Nullable Message create(short type) {
+ switch (type) {
+ case TEST_MSG_2_TYPE:
+ return new TestMessage2();
+
+ default:
+ return null;
+ }
+ }
+ }
+
+ /** Test message. */
+ private static class TestMessage1 implements Message {
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+ }
+
+ /** Test message. */
+ private static class TestMessage2 implements Message {
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
new file mode 100644
index 0000000..19ab431
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/**
+ * Tests that node will not start if some component tries to register message factory with direct type
+ * for which message factory is already registered.
+ */
+public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
+ /** Test plugin name. */
+ private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN";
+
+ /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */
+ private static final short MSG_DIRECT_TYPE = -44;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setPluginProviders(new TestPluginProvider());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Tests that node will not start if some component tries to register message factory with direct type
+ * for which message factory is already registered.
+ */
+ @Test
+ public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception {
+ assertThrows(log, this::startGrid, IgniteCheckedException.class,
+ "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE);
+ }
+
+ /** Plugin with own message factory. */
+ private static class TestPlugin implements IgnitePlugin {
+ }
+
+ /** */
+ public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return TEST_PLUGIN_NAME;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends IgnitePlugin> T plugin() {
+ return (T)new TestPlugin();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String version() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String copyright() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
+ registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
+ @Override public void registerAll(IgniteMessageFactory factory) {
+ factory.register(MSG_DIRECT_TYPE, TestMessage::new);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(PluginContext ctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStart() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onIgniteStop(boolean cancel) {
+ // no-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void validateNewNode(ClusterNode node, Serializable data) {
+ // No-op.
+ }
+ }
+
+ /** */
+ private static class TestPluginConfiguration implements PluginConfiguration {
+ }
+
+ /** Test message with already registered direct type. */
+ private static class TestMessage implements Message {
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return MSG_DIRECT_TYPE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+ }
+
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index ea94bd2..5c6082d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,10 +22,8 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.typedef.CO;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -42,11 +40,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*
*/
static {
- GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
}
/** {@inheritDoc} */
@@ -62,8 +56,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
* @return Cache configuration.
* @throws Exception In case of error.
*/
- protected CacheConfiguration cacheConfiguration() throws Exception {
- CacheConfiguration cfg = defaultCacheConfiguration();
+ protected CacheConfiguration<?, ?> cacheConfiguration() throws Exception {
+ CacheConfiguration<?, ?> cfg = defaultCacheConfiguration();
cfg.setCacheMode(PARTITIONED);
cfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -113,7 +107,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*/
@Test
public void testAddedDeploymentInfo() throws Exception {
- GridCacheContext ctx = cacheContext();
+ GridCacheContext<?, ?> ctx = cacheContext();
if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
assertFalse(ctx.deploymentEnabled());
@@ -137,7 +131,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*/
@Test
public void testAddedDeploymentInfo2() throws Exception {
- GridCacheContext ctx = cacheContext();
+ GridCacheContext<?, ?> ctx = cacheContext();
if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
assertFalse(ctx.deploymentEnabled());
@@ -161,8 +155,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
/**
* @return Cache context.
*/
- protected GridCacheContext cacheContext() {
- return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context();
+ protected GridCacheContext<?, ?> cacheContext() {
+ return ((IgniteCacheProxy<?, ?>)grid(0).cache(DEFAULT_CACHE_NAME)).context();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index aeaabda..49d588f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -33,9 +33,11 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -60,7 +62,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(atomicityMode());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -153,7 +155,9 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
e0.writeTo(buf, writer);
CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
- e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1));
+ IgniteMessageFactoryImpl msgFactory =
+ new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+ e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(msgFactory, (byte)1));
assertEquals(e0.cacheId(), e1.cacheId());
assertEquals(e0.eventType(), e1.eventType());
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 8034093..326b716 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -29,9 +29,8 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
@@ -52,7 +51,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
* Super class for all communication self tests.
* @param <T> Type of communication SPI.
*/
-public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
/** */
private static long msgId = 1;
@@ -75,17 +74,13 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
private static GridTimeoutProcessor timeoutProcessor;
/** */
- protected boolean useSsl = false;
+ protected boolean useSsl;
/**
*
*/
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/** */
@@ -162,7 +157,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
for (ClusterNode node : nodes) {
synchronized (mux) {
if (!msgDestMap.containsKey(entry.getKey()))
- msgDestMap.put(entry.getKey(), new HashSet<UUID>());
+ msgDestMap.put(entry.getKey(), new HashSet<>());
msgDestMap.get(entry.getKey()).add(node.id());
}
@@ -208,7 +203,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
for (ClusterNode node : nodes) {
synchronized (mux) {
if (!msgDestMap.containsKey(sndId))
- msgDestMap.put(sndId, new HashSet<UUID>());
+ msgDestMap.put(sndId, new HashSet<>());
msgDestMap.get(sndId).add(node.id());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 797328e..21a846d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -31,14 +31,13 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoManager;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -66,35 +65,15 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
*
*/
static {
- GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
- GridIoMessageFactory.registerCustom(TestMessage1.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestMessage1();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new);
- GridIoMessageFactory.registerCustom(TestMessage2.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestMessage2();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new);
- GridIoMessageFactory.registerCustom(TestBadMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new TestBadMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
}
/** {@inheritDoc} */
@@ -105,7 +84,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
cfg.setFailureHandler(new TestFailureHandler());
- CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+ CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
@@ -146,25 +125,25 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
try {
startGrids(2);
- Ignite ignite0 = grid(0);
- Ignite ignite1 = grid(1);
+ IgniteEx ignite0 = grid(0);
+ IgniteEx ignite1 = grid(1);
- ((IgniteKernal)ignite0).context().cache().context().io().addCacheHandler(
+ ignite0.context().cache().context().io().addCacheHandler(
0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
throw new RuntimeException("Test bad message exception");
}
});
- ((IgniteKernal)ignite1).context().cache().context().io().addCacheHandler(
+ ignite1.context().cache().context().io().addCacheHandler(
0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
throw new RuntimeException("Test bad message exception");
}
});
- ((IgniteKernal)ignite0).context().cache().context().io().send(
- ((IgniteKernal)ignite1).localNode().id(), new TestBadMessage(), (byte)2);
+ ignite0.context().cache().context().io().send(
+ ignite1.localNode().id(), new TestBadMessage(), (byte)2);
boolean res = failureLatch.await(5, TimeUnit.SECONDS);
@@ -179,8 +158,8 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void doSend() throws Exception {
- GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
- GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
+ GridIoManager mgr0 = grid(0).context().io();
+ GridIoManager mgr1 = grid(1).context().io();
String topic = "test-topic";
@@ -195,14 +174,14 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
assertEquals(10, messages.size());
- int count = 0;
+ int cnt = 0;
for (TestMessage1 msg1 : messages) {
assertTrue(msg1.body().contains(TEST_BODY));
int i = Integer.parseInt(msg1.body().substring(TEST_BODY.length() + 1));
- assertEquals(count, i);
+ assertEquals(cnt, i);
TestMessage2 msg2 = (TestMessage2) msg1.message();
@@ -214,11 +193,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
GridTestMessage msg3 = (GridTestMessage) msg2.message();
- assertEquals(count, msg3.getMsgId());
+ assertEquals(cnt, msg3.getMsgId());
assertEquals(grid(1).localNode().id(), msg3.getSourceNodeId());
- count++;
+ cnt++;
}
}
catch (Exception e) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 7940a63..54e5386 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
/**
* Test for {@link TcpCommunicationSpi}
*/
-abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
+abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> {
/** */
private static final int SPI_COUNT = 3;
@@ -59,7 +59,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}
/** {@inheritDoc} */
- @Override protected CommunicationSpi getSpi(int idx) {
+ @Override protected CommunicationSpi<Message> getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
if (!useShmem)
@@ -88,7 +88,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
super.testSendToManyNodes();
// Test idle clients remove.
- for (CommunicationSpi spi : spis.values()) {
+ for (CommunicationSpi<Message> spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(getSpiCount() - 1, clients.size());
@@ -129,13 +129,13 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
final CyclicBarrier b = new CyclicBarrier(THREADS);
- List<IgniteInternalFuture> futs = new ArrayList<>();
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
- futs.add(GridTestUtils.runAsync(new Callable() {
- @Override public Object call() throws Exception {
+ futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
List<ClusterNode> checkNodes = new ArrayList<>(nodes);
assert checkNodes.size() > 1;
@@ -156,7 +156,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}));
}
- for (IgniteInternalFuture f : futs)
+ for (IgniteInternalFuture<?> f : futs)
f.get();
}
@@ -164,7 +164,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
@Override protected void afterTest() throws Exception {
super.afterTest();
- for (CommunicationSpi spi : spis.values()) {
+ for (CommunicationSpi<Message> spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
for (int i = 0; i < 20; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 7ab0d6f..69e4bef 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -37,13 +37,12 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -65,7 +64,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi>
+public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi<Message>>
extends GridSpiAbstractTest<T> {
/** */
private static final int SPI_CNT = 2;
@@ -101,11 +100,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
*
*/
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
@@ -356,24 +351,24 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
assertTrue(latch.await(10, TimeUnit.SECONDS));
- for (CommunicationSpi spi : spis) {
+ for (CommunicationSpi<?> spi : spis) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(1, clients.size());
- final GridNioServer srv = U.field(spi, "nioSrvr");
+ final GridNioServer<?> srv = U.field(spi, "nioSrvr");
final int conns = pairedConnections ? 2 : 1;
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- Collection sessions = U.field(srv, "sessions");
+ Collection<?> sessions = U.field(srv, "sessions");
return sessions.size() == conns * connectionsPerNode;
}
}, 5000);
- Collection sessions = U.field(srv, "sessions");
+ Collection<?> sessions = U.field(srv, "sessions");
assertEquals(conns * connectionsPerNode, sessions.size());
}
@@ -396,7 +391,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/**
* @return SPI.
*/
- private CommunicationSpi createSpi() {
+ private CommunicationSpi<Message> createSpi() {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
spi.setLocalAddress("127.0.0.1");
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index a53b43b..f99df2b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -38,14 +38,13 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -100,11 +99,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
private static boolean reject;
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 408eb10..d99f48f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -27,14 +27,13 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -56,7 +55,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -76,11 +75,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
*
*/
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
@@ -173,7 +168,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
final long totAcked0 = totAcked;
for (TcpCommunicationSpi spi : spis) {
- GridNioServer srv = U.field(spi, "nioSrvr");
+ GridNioServer<?> srv = U.field(spi, "nioSrvr");
final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
@@ -281,7 +276,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ClusterNode node0 = nodes.get(0);
ClusterNode node1 = nodes.get(1);
- final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+ final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
int msgId = 0;
@@ -341,7 +336,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
- final GridNioServer srv = U.field(spi, "nioSrvr");
+ final GridNioServer<?> srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5ec734a..1d03590 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,13 +32,12 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -61,7 +60,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -90,11 +89,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
*
*/
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
@@ -674,7 +669,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi, boolean in) throws Exception {
- final GridNioServer srv = U.field(spi, "nioSrvr");
+ final GridNioServer<?> srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index d937bb0..ef9b413 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
@@ -59,7 +58,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi<Message>>
extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -80,11 +79,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
*
*/
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
@@ -95,7 +90,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
}
/** */
- private class TestListener implements CommunicationListener<Message> {
+ private static class TestListener implements CommunicationListener<Message> {
/** */
private GridConcurrentHashSet<Long> msgIds = new GridConcurrentHashSet<>();
@@ -194,7 +189,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final long totAcked0 = totAcked;
for (TcpCommunicationSpi spi : spis) {
- GridNioServer srv = U.field(spi, "nioSrvr");
+ GridNioServer<?> srv = U.field(spi, "nioSrvr");
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
@@ -306,7 +301,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
// Check that session will not be closed by idle timeout because expected close by queue overflow.
assertTrue(spi0.getIdleConnectionTimeout() > awaitTime);
- final GridNioServer srv1 = U.field(spi1, "nioSrvr");
+ final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
// For prevent session close by write timeout.
srv1.writeTimeout(60_000);
@@ -392,7 +387,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
- final GridNioServer srv = U.field(spi, "nioSrvr");
+ final GridNioServer<?> srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 99840c8..5cb8999 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -32,12 +32,11 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
-import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteInClosure;
@@ -63,11 +62,7 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
private final CountDownLatch latch = new CountDownLatch(1);
static {
- GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
- @Override public Message apply() {
- return new GridTestMessage();
- }
- });
+ IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
}
/**
@@ -121,10 +116,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
SynchronizedCommunicationSpi.class.getSimpleName());
- MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
- if (mbeanServer.isRegistered(mbeanName))
- return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class,
+ if (mbeanSrv.isRegistered(mbeanName))
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TcpCommunicationSpiMBean.class,
true);
else
fail("MBean is not registered: " + mbeanName.getCanonicalName());
@@ -159,10 +154,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
latch.await(10, TimeUnit.SECONDS);
- ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+ ClusterGroup clusterGrpNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
// Send job from node0 to node1.
- grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
+ grid(0).compute(clusterGrpNode1).call(new IgniteCallable<Boolean>() {
@Override public Boolean call() throws Exception {
return Boolean.TRUE;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index da43ded..6424bfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -567,7 +568,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** {@inheritDoc} */
@Override public MessageFactory messageFactory() {
if (factory == null)
- factory = new GridIoMessageFactory(null);
+ factory = new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
return factory;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1f41a08..84aec2e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -45,7 +45,9 @@ import org.apache.ignite.internal.managers.communication.IgniteCommunicationBala
import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteCommunicationSslBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImplTest;
import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
+import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConflictTest;
import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
@@ -330,6 +332,9 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationBalanceMultipleConnectionsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationSslBalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, IgniteMessageFactoryImplTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, MessageDirectTypeIdConflictTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index ce04839..0720b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -21,7 +21,9 @@ import java.nio.ByteBuffer;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -122,7 +124,8 @@ public class GridMessageCollectionTest {
assertEquals(m.directType(), type);
- GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
+ IgniteMessageFactory msgFactory =
+ new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
Message mx = msgFactory.create(type);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 0ff53f7..e4aae7d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,107 +23,52 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
/**
* H2 Value message factory.
*/
-public class GridH2ValueMessageFactory implements MessageFactory {
+public class GridH2ValueMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
- @Nullable @Override public Message create(short type) {
- switch (type) {
- case -4:
- return GridH2Null.INSTANCE;
-
- case -5:
- return new GridH2Boolean();
-
- case -6:
- return new GridH2Byte();
-
- case -7:
- return new GridH2Short();
-
- case -8:
- return new GridH2Integer();
-
- case -9:
- return new GridH2Long();
-
- case -10:
- return new GridH2Decimal();
-
- case -11:
- return new GridH2Double();
-
- case -12:
- return new GridH2Float();
-
- case -13:
- return new GridH2Time();
-
- case -14:
- return new GridH2Date();
-
- case -15:
- return new GridH2Timestamp();
-
- case -16:
- return new GridH2Bytes();
-
- case -17:
- return new GridH2String();
-
- case -18:
- return new GridH2Array();
-
- case -19:
- return new GridH2JavaObject();
-
- case -20:
- return new GridH2Uuid();
-
- case -21:
- return new GridH2Geometry();
-
- case -22:
- return new GridH2CacheObject();
-
- case -30:
- return new GridH2IndexRangeRequest();
-
- case -31:
- return new GridH2IndexRangeResponse();
-
- case -32:
- return new GridH2RowMessage();
-
- case -33:
- return new GridH2QueryRequest();
-
- case -34:
- return new GridH2RowRange();
-
- case -35:
- return new GridH2RowRangeBounds();
-
- case -54:
- return new QueryTable();
-
- case -55:
- return new GridH2DmlRequest();
-
- case -56:
- return new GridH2DmlResponse();
-
- case -57:
- return new GridH2SelectForUpdateTxDetails();
- }
+ @Override public void registerAll(IgniteMessageFactory factory) {
+ factory.register((short)-4, () -> GridH2Null.INSTANCE);
+ factory.register((short)-5, GridH2Boolean::new);
+ factory.register((short)-6, GridH2Byte::new);
+ factory.register((short)-7, GridH2Short::new);
+ factory.register((short)-8, GridH2Integer::new);
+ factory.register((short)-9, GridH2Long::new);
+ factory.register((short)-10, GridH2Decimal::new);
+ factory.register((short)-11, GridH2Double::new);
+ factory.register((short)-12, GridH2Float::new);
+ factory.register((short)-13, GridH2Time::new);
+ factory.register((short)-14, GridH2Date::new);
+ factory.register((short)-15, GridH2Timestamp::new);
+ factory.register((short)-16, GridH2Bytes::new);
+ factory.register((short)-17, GridH2String::new);
+ factory.register((short)-18, GridH2Array::new);
+ factory.register((short)-19, GridH2JavaObject::new);
+ factory.register((short)-20, GridH2Uuid::new);
+ factory.register((short)-21, GridH2Geometry::new);
+ factory.register((short)-22, GridH2CacheObject::new);
+ factory.register((short)-30, GridH2IndexRangeRequest::new);
+ factory.register((short)-31, GridH2IndexRangeResponse::new);
+ factory.register((short)-32, GridH2RowMessage::new);
+ factory.register((short)-33, GridH2QueryRequest::new);
+ factory.register((short)-34, GridH2RowRange::new);
+ factory.register((short)-35, GridH2RowRangeBounds::new);
+ factory.register((short)-54, QueryTable::new);
+ factory.register((short)-55, GridH2DmlRequest::new);
+ factory.register((short)-56, GridH2DmlResponse::new);
+ factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
+ }
- return null;
+ /** {@inheritDoc} */
+ @Override @Nullable public Message create(short type) {
+ throw new UnsupportedOperationException();
}
/**