You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/08/31 16:24:56 UTC
[ignite] 03/03: Revert "IGNITE-12568 MessageFactory is refactored
in order to detect registration of message with the same direct type"
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch ignite-2.9-revert-12568
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit ed52559eb95c913e4b6ebc1b334f60c27ddbac26
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Mon Aug 31 19:24:24 2020 +0300
Revert "IGNITE-12568 MessageFactory is refactored in order to detect registration of message with the same direct type"
This reverts commit 65c30ec6
---
.../managers/communication/GridIoManager.java | 4 +-
.../communication/GridIoMessageFactory.java | 1127 ++++++++++++++++----
.../communication/IgniteMessageFactoryImpl.java | 166 ---
.../communication/IgniteMessageFactory.java | 39 -
.../extensions/communication/MessageFactory.java | 3 -
.../communication/MessageFactoryProvider.java | 46 -
.../tcp/TcpCommunicationMetricsListener.java | 16 +-
.../org.apache.ignite.plugin.PluginProvider | 1 -
.../GridManagerLocalMessageListenerSelfTest.java | 14 +-
.../GridCommunicationSendMessageSelfTest.java | 19 +-
.../IgniteMessageFactoryImplTest.java | 198 ----
.../MessageDirectTypeIdConflictTest.java | 210 ----
.../GridCacheConditionalDeploymentSelfTest.java | 22 +-
...niteCacheContinuousQueryImmutableEntryTest.java | 8 +-
.../GridAbstractCommunicationSelfTest.java | 17 +-
.../communication/GridCacheMessageSelfTest.java | 61 +-
.../tcp/GridTcpCommunicationSpiAbstractTest.java | 16 +-
...pCommunicationSpiConcurrentConnectSelfTest.java | 21 +-
...idTcpCommunicationSpiMultithreadedSelfTest.java | 9 +-
...GridTcpCommunicationSpiRecoveryAckSelfTest.java | 17 +-
.../GridTcpCommunicationSpiRecoverySelfTest.java | 13 +-
...TcpCommunicationRecoveryAckClosureSelfTest.java | 19 +-
.../tcp/TcpCommunicationStatisticsTest.java | 35 +-
.../ignite/testframework/GridSpiTestContext.java | 3 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 5 -
.../ignite/util/GridMessageCollectionTest.java | 5 +-
.../h2/twostep/msg/GridH2ValueMessageFactory.java | 129 ++-
27 files changed, 1220 insertions(+), 1003 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 552613f..28c5881 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -517,8 +517,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
List<MessageFactory> compMsgs = new ArrayList<>();
- compMsgs.add(new GridIoMessageFactory());
-
for (IgniteComponentType compType : IgniteComponentType.values()) {
MessageFactory f = compType.messageFactory();
@@ -529,7 +527,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (!compMsgs.isEmpty())
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactory[compMsgs.size()]));
- msgFactory = new IgniteMessageFactoryImpl(msgs);
+ msgFactory = new GridIoMessageFactory(msgs);
if (log.isDebugEnabled())
log.debug(startInfo());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 84a84cd..69685ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.managers.communication;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobExecuteRequest;
import org.apache.ignite.internal.GridJobExecuteResponse;
@@ -179,9 +182,9 @@ import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridMessageCollection;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
+import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
@@ -194,198 +197,940 @@ import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMess
/**
* Message factory implementation.
*/
-public class GridIoMessageFactory implements MessageFactoryProvider {
- /** {@inheritDoc} */
- @Override public void registerAll(IgniteMessageFactory factory) {
- // -54 is reserved for SQL.
- // -46 ... -51 - snapshot messages.
- factory.register((short)-61, IgniteDiagnosticMessage::new);
- factory.register((short)-53, SchemaOperationStatusMessage::new);
- factory.register((short)-52, GridIntList::new);
- factory.register((short)-51, NearCacheUpdates::new);
- factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
- factory.register((short)-49, UpdateErrors::new);
- factory.register((short)-48, GridDhtAtomicNearResponse::new);
- factory.register((short)-45, GridChangeGlobalStateMessageResponse::new);
- factory.register((short)-44, HandshakeMessage2::new);
- factory.register((short)-43, IgniteIoTestMessage::new);
- factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
- factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new);
- factory.register((short)-26, TxLockList::new);
- factory.register((short)-25, TxLock::new);
- factory.register((short)-24, TxLocksRequest::new);
- factory.register((short)-23, TxLocksResponse::new);
- factory.register(TcpCommunicationSpi.NODE_ID_MSG_TYPE, NodeIdMessage::new);
- factory.register(TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE, RecoveryLastReceivedMessage::new);
- factory.register(TcpCommunicationSpi.HANDSHAKE_MSG_TYPE, HandshakeMessage::new);
- factory.register(TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE, HandshakeWaitMessage::new);
- factory.register((short)0, GridJobCancelRequest::new);
- factory.register((short)1, GridJobExecuteRequest::new);
- factory.register((short)2, GridJobExecuteResponse::new);
- factory.register((short)3, GridJobSiblingsRequest::new);
- factory.register((short)4, GridJobSiblingsResponse::new);
- factory.register((short)5, GridTaskCancelRequest::new);
- factory.register((short)6, GridTaskSessionRequest::new);
- factory.register((short)7, GridCheckpointRequest::new);
- factory.register((short)8, GridIoMessage::new);
- factory.register((short)9, GridIoUserMessage::new);
- factory.register((short)10, GridDeploymentInfoBean::new);
- factory.register((short)11, GridDeploymentRequest::new);
- factory.register((short)12, GridDeploymentResponse::new);
- factory.register((short)13, GridEventStorageMessage::new);
- factory.register((short)16, GridCacheTxRecoveryRequest::new);
- factory.register((short)17, GridCacheTxRecoveryResponse::new);
- factory.register((short)20, GridCacheTtlUpdateRequest::new);
- factory.register((short)21, GridDistributedLockRequest::new);
- factory.register((short)22, GridDistributedLockResponse::new);
- factory.register((short)23, GridDistributedTxFinishRequest::new);
- factory.register((short)24, GridDistributedTxFinishResponse::new);
- factory.register((short)25, GridDistributedTxPrepareRequest::new);
- factory.register((short)26, GridDistributedTxPrepareResponse::new);
- factory.register((short)27, GridDistributedUnlockRequest::new);
- factory.register((short)28, GridDhtAffinityAssignmentRequest::new);
- factory.register((short)29, GridDhtAffinityAssignmentResponse::new);
- factory.register((short)30, GridDhtLockRequest::new);
- factory.register((short)31, GridDhtLockResponse::new);
- factory.register((short)32, GridDhtTxFinishRequest::new);
- factory.register((short)33, GridDhtTxFinishResponse::new);
- factory.register((short)34, GridDhtTxPrepareRequest::new);
- factory.register((short)35, GridDhtTxPrepareResponse::new);
- factory.register((short)36, GridDhtUnlockRequest::new);
- factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
- factory.register((short)38, GridDhtAtomicUpdateRequest::new);
- factory.register((short)39, GridDhtAtomicUpdateResponse::new);
- factory.register((short)40, GridNearAtomicFullUpdateRequest::new);
- factory.register((short)41, GridNearAtomicUpdateResponse::new);
- factory.register((short)42, GridDhtForceKeysRequest::new);
- factory.register((short)43, GridDhtForceKeysResponse::new);
- factory.register((short)44, GridDhtPartitionDemandLegacyMessage::new);
- factory.register((short)45, GridDhtPartitionDemandMessage::new);
- factory.register((short)46, GridDhtPartitionsFullMessage::new);
- factory.register((short)47, GridDhtPartitionsSingleMessage::new);
- factory.register((short)48, GridDhtPartitionsSingleRequest::new);
- factory.register((short)49, GridNearGetRequest::new);
- factory.register((short)50, GridNearGetResponse::new);
- factory.register((short)51, GridNearLockRequest::new);
- factory.register((short)52, GridNearLockResponse::new);
- factory.register((short)53, GridNearTxFinishRequest::new);
- factory.register((short)54, GridNearTxFinishResponse::new);
- factory.register((short)55, GridNearTxPrepareRequest::new);
- factory.register((short)56, GridNearTxPrepareResponse::new);
- factory.register((short)57, GridNearUnlockRequest::new);
- factory.register((short)58, GridCacheQueryRequest::new);
- factory.register((short)59, GridCacheQueryResponse::new);
- factory.register((short)61, GridContinuousMessage::new);
- factory.register((short)62, DataStreamerRequest::new);
- factory.register((short)63, DataStreamerResponse::new);
- factory.register((short)76, GridTaskResultRequest::new);
- factory.register((short)77, GridTaskResultResponse::new);
- factory.register((short)78, MissingMappingRequestMessage::new);
- factory.register((short)79, MissingMappingResponseMessage::new);
- factory.register((short)80, MetadataRequestMessage::new);
- factory.register((short)81, MetadataResponseMessage::new);
- factory.register((short)82, JobStealingRequest::new);
- factory.register((short)84, GridByteArrayList::new);
- factory.register((short)85, GridLongList::new);
- factory.register((short)86, GridCacheVersion::new);
- factory.register((short)87, GridDhtPartitionExchangeId::new);
- factory.register((short)88, GridCacheReturn::new);
- factory.register((short)89, CacheObjectImpl::new);
- factory.register((short)90, KeyCacheObjectImpl::new);
- factory.register((short)91, GridCacheEntryInfo::new);
- factory.register((short)92, CacheEntryInfoCollection::new);
- factory.register((short)93, CacheInvokeDirectResult::new);
- factory.register((short)94, IgniteTxKey::new);
- factory.register((short)95, DataStreamerEntry::new);
- factory.register((short)96, CacheContinuousQueryEntry::new);
- factory.register((short)97, CacheEvictionEntry::new);
- factory.register((short)98, CacheEntryPredicateContainsValue::new);
- factory.register((short)99, CacheEntrySerializablePredicate::new);
- factory.register((short)100, IgniteTxEntry::new);
- factory.register((short)101, TxEntryValueHolder::new);
- factory.register((short)102, CacheVersionedValue::new);
- factory.register((short)103, GridCacheRawVersionedEntry::new);
- factory.register((short)104, GridCacheVersionEx::new);
- factory.register((short)105, CacheObjectByteArrayImpl::new);
- factory.register((short)106, GridQueryCancelRequest::new);
- factory.register((short)107, GridQueryFailResponse::new);
- factory.register((short)108, GridQueryNextPageRequest::new);
- factory.register((short)109, GridQueryNextPageResponse::new);
- factory.register((short)111, AffinityTopologyVersion::new);
- factory.register((short)112, GridCacheSqlQuery::new);
- factory.register((short)113, BinaryObjectImpl::new);
- factory.register((short)114, GridDhtPartitionSupplyMessage::new);
- factory.register((short)115, UUIDCollectionMessage::new);
- factory.register((short)116, GridNearSingleGetRequest::new);
- factory.register((short)117, GridNearSingleGetResponse::new);
- factory.register((short)118, CacheContinuousQueryBatchAck::new);
- factory.register((short)119, BinaryEnumObjectImpl::new);
-
- // [120..123] - DR
- factory.register((short)124, GridMessageCollection::new);
- factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
- factory.register((short)126, GridNearAtomicSingleUpdateInvokeRequest::new);
- factory.register((short)127, GridNearAtomicSingleUpdateFilterRequest::new);
- factory.register((short)128, CacheGroupAffinityMessage::new);
- factory.register((short)129, WalStateAckMessage::new);
- factory.register((short)130, UserManagementOperationFinishedMessage::new);
- factory.register((short)131, UserAuthenticateRequestMessage::new);
- factory.register((short)132, UserAuthenticateResponseMessage::new);
- factory.register((short)133, ClusterMetricsUpdateMessage::new);
- factory.register((short)134, ContinuousRoutineStartResultMessage::new);
- factory.register((short)135, LatchAckMessage::new);
- factory.register((short)136, MvccTxSnapshotRequest::new);
- factory.register((short)137, MvccAckRequestTx::new);
- factory.register((short)138, MvccFutureResponse::new);
- factory.register((short)139, MvccQuerySnapshotRequest::new);
- factory.register((short)140, MvccAckRequestQueryCntr::new);
- factory.register((short)141, MvccSnapshotResponse::new);
- factory.register((short)143, GridCacheMvccEntryInfo::new);
- factory.register((short)144, GridDhtTxQueryEnlistResponse::new);
- factory.register((short)145, MvccAckRequestQueryId::new);
- factory.register((short)146, MvccAckRequestTxAndQueryCntr::new);
- factory.register((short)147, MvccAckRequestTxAndQueryId::new);
- factory.register((short)148, MvccVersionImpl::new);
- factory.register((short)149, MvccActiveQueriesMessage::new);
- factory.register((short)150, MvccSnapshotWithoutTxs::new);
- factory.register((short)151, GridNearTxQueryEnlistRequest::new);
- factory.register((short)152, GridNearTxQueryEnlistResponse::new);
- factory.register((short)153, GridNearTxQueryResultsEnlistRequest::new);
- factory.register((short)154, GridNearTxQueryResultsEnlistResponse::new);
- factory.register((short)155, GridDhtTxQueryEnlistRequest::new);
- factory.register((short)156, GridDhtTxQueryFirstEnlistRequest::new);
- factory.register((short)157, PartitionUpdateCountersMessage::new);
- factory.register((short)158, GridDhtPartitionSupplyMessageV2::new);
- factory.register((short)159, GridNearTxEnlistRequest::new);
- factory.register((short)160, GridNearTxEnlistResponse::new);
- factory.register((short)161, GridInvokeValue::new);
- factory.register((short)162, GenerateEncryptionKeyRequest::new);
- factory.register((short)163, GenerateEncryptionKeyResponse::new);
- factory.register((short)164, MvccRecoveryFinishedMessage::new);
- factory.register((short)165, PartitionCountersNeighborcastRequest::new);
- factory.register((short)166, PartitionCountersNeighborcastResponse::new);
- factory.register((short)167, ServiceDeploymentProcessId::new);
- factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new);
- factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
- factory.register((short)170, DeadlockProbe::new);
- factory.register((short)171, ProbedTx::new);
- factory.register(GridQueryKillRequest.TYPE_CODE, GridQueryKillRequest::new);
- factory.register(GridQueryKillResponse.TYPE_CODE, GridQueryKillResponse::new);
- factory.register(GridIoSecurityAwareMessage.TYPE_CODE, GridIoSecurityAwareMessage::new);
- factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
- factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
- factory.register((short)177, TcpInverseConnectionResponseMessage::new);
-
- // [-3..119] [124..129] [-23..-28] [-36..-55] - this
- // [120..123] - DR
- // [-4..-22, -30..-35] - SQL
- // [2048..2053] - Snapshots
- // [-42..-37] - former hadoop.
- // [64..71] - former IGFS.
+public class GridIoMessageFactory implements MessageFactory {
+ /** Custom messages registry. Used for test purposes. */
+ private static final Map<Short, IgniteOutClosure<Message>> CUSTOM = new ConcurrentHashMap<>();
+
+ /** Extensions. */
+ private final MessageFactory[] ext;
+
+ /**
+ * @param ext Extensions.
+ */
+ public GridIoMessageFactory(MessageFactory[] ext) {
+ this.ext = ext;
}
/** {@inheritDoc} */
@Override public Message create(short type) {
- throw new UnsupportedOperationException();
+ Message msg = null;
+
+ switch (type) {
+ // -54 is reserved for SQL.
+ // -46 ... -51 - snapshot messages.
+ case -61:
+ msg = new IgniteDiagnosticMessage();
+
+ break;
+
+ case -53:
+ msg = new SchemaOperationStatusMessage();
+
+ break;
+
+ case -52:
+ msg = new GridIntList();
+
+ break;
+
+ case -51:
+ msg = new NearCacheUpdates();
+
+ break;
+
+ case -50:
+ msg = new GridNearAtomicCheckUpdateRequest();
+
+ break;
+
+ case -49:
+ msg = new UpdateErrors();
+
+ break;
+
+ case -48:
+ msg = new GridDhtAtomicNearResponse();
+
+ break;
+
+ case -45:
+ msg = new GridChangeGlobalStateMessageResponse();
+
+ break;
+
+ case -44:
+ msg = new HandshakeMessage2();
+
+ break;
+
+ case -43:
+ msg = new IgniteIoTestMessage();
+
+ break;
+
+ case -36:
+ msg = new GridDhtAtomicSingleUpdateRequest();
+
+ break;
+
+ case -27:
+ msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+ break;
+
+ case -26:
+ msg = new TxLockList();
+
+ break;
+
+ case -25:
+ msg = new TxLock();
+
+ break;
+
+ case -24:
+ msg = new TxLocksRequest();
+
+ break;
+
+ case -23:
+ msg = new TxLocksResponse();
+
+ break;
+
+ case TcpCommunicationSpi.NODE_ID_MSG_TYPE:
+ msg = new NodeIdMessage();
+
+ break;
+
+ case TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE:
+ msg = new RecoveryLastReceivedMessage();
+
+ break;
+
+ case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE:
+ msg = new HandshakeMessage();
+
+ break;
+
+ case TcpCommunicationSpi.HANDSHAKE_WAIT_MSG_TYPE:
+ msg = new HandshakeWaitMessage();
+
+ break;
+
+ case 0:
+ msg = new GridJobCancelRequest();
+
+ break;
+
+ case 1:
+ msg = new GridJobExecuteRequest();
+
+ break;
+
+ case 2:
+ msg = new GridJobExecuteResponse();
+
+ break;
+
+ case 3:
+ msg = new GridJobSiblingsRequest();
+
+ break;
+
+ case 4:
+ msg = new GridJobSiblingsResponse();
+
+ break;
+
+ case 5:
+ msg = new GridTaskCancelRequest();
+
+ break;
+
+ case 6:
+ msg = new GridTaskSessionRequest();
+
+ break;
+
+ case 7:
+ msg = new GridCheckpointRequest();
+
+ break;
+
+ case 8:
+ msg = new GridIoMessage();
+
+ break;
+
+ case 9:
+ msg = new GridIoUserMessage();
+
+ break;
+
+ case 10:
+ msg = new GridDeploymentInfoBean();
+
+ break;
+
+ case 11:
+ msg = new GridDeploymentRequest();
+
+ break;
+
+ case 12:
+ msg = new GridDeploymentResponse();
+
+ break;
+
+ case 13:
+ msg = new GridEventStorageMessage();
+
+ break;
+
+ case 16:
+ msg = new GridCacheTxRecoveryRequest();
+
+ break;
+
+ case 17:
+ msg = new GridCacheTxRecoveryResponse();
+
+ break;
+
+ case 20:
+ msg = new GridCacheTtlUpdateRequest();
+
+ break;
+
+ case 21:
+ msg = new GridDistributedLockRequest();
+
+ break;
+
+ case 22:
+ msg = new GridDistributedLockResponse();
+
+ break;
+
+ case 23:
+ msg = new GridDistributedTxFinishRequest();
+
+ break;
+
+ case 24:
+ msg = new GridDistributedTxFinishResponse();
+
+ break;
+
+ case 25:
+ msg = new GridDistributedTxPrepareRequest();
+
+ break;
+
+ case 26:
+ msg = new GridDistributedTxPrepareResponse();
+
+ break;
+
+ case 27:
+ msg = new GridDistributedUnlockRequest();
+
+ break;
+
+ case 28:
+ msg = new GridDhtAffinityAssignmentRequest();
+
+ break;
+
+ case 29:
+ msg = new GridDhtAffinityAssignmentResponse();
+
+ break;
+
+ case 30:
+ msg = new GridDhtLockRequest();
+
+ break;
+
+ case 31:
+ msg = new GridDhtLockResponse();
+
+ break;
+
+ case 32:
+ msg = new GridDhtTxFinishRequest();
+
+ break;
+
+ case 33:
+ msg = new GridDhtTxFinishResponse();
+
+ break;
+
+ case 34:
+ msg = new GridDhtTxPrepareRequest();
+
+ break;
+
+ case 35:
+ msg = new GridDhtTxPrepareResponse();
+
+ break;
+
+ case 36:
+ msg = new GridDhtUnlockRequest();
+
+ break;
+
+ case 37:
+ msg = new GridDhtAtomicDeferredUpdateResponse();
+
+ break;
+
+ case 38:
+ msg = new GridDhtAtomicUpdateRequest();
+
+ break;
+
+ case 39:
+ msg = new GridDhtAtomicUpdateResponse();
+
+ break;
+
+ case 40:
+ msg = new GridNearAtomicFullUpdateRequest();
+
+ break;
+
+ case 41:
+ msg = new GridNearAtomicUpdateResponse();
+
+ break;
+
+ case 42:
+ msg = new GridDhtForceKeysRequest();
+
+ break;
+
+ case 43:
+ msg = new GridDhtForceKeysResponse();
+
+ break;
+
+ case 44:
+ msg = new GridDhtPartitionDemandLegacyMessage();
+
+ break;
+
+ case 45:
+ msg = new GridDhtPartitionDemandMessage();
+
+ break;
+
+ case 46:
+ msg = new GridDhtPartitionsFullMessage();
+
+ break;
+
+ case 47:
+ msg = new GridDhtPartitionsSingleMessage();
+
+ break;
+
+ case 48:
+ msg = new GridDhtPartitionsSingleRequest();
+
+ break;
+
+ case 49:
+ msg = new GridNearGetRequest();
+
+ break;
+
+ case 50:
+ msg = new GridNearGetResponse();
+
+ break;
+
+ case 51:
+ msg = new GridNearLockRequest();
+
+ break;
+
+ case 52:
+ msg = new GridNearLockResponse();
+
+ break;
+
+ case 53:
+ msg = new GridNearTxFinishRequest();
+
+ break;
+
+ case 54:
+ msg = new GridNearTxFinishResponse();
+
+ break;
+
+ case 55:
+ msg = new GridNearTxPrepareRequest();
+
+ break;
+
+ case 56:
+ msg = new GridNearTxPrepareResponse();
+
+ break;
+
+ case 57:
+ msg = new GridNearUnlockRequest();
+
+ break;
+
+ case 58:
+ msg = new GridCacheQueryRequest();
+
+ break;
+
+ case 59:
+ msg = new GridCacheQueryResponse();
+
+ break;
+
+ case 61:
+ msg = new GridContinuousMessage();
+
+ break;
+
+ case 62:
+ msg = new DataStreamerRequest();
+
+ break;
+
+ case 63:
+ msg = new DataStreamerResponse();
+
+ break;
+
+ case 76:
+ msg = new GridTaskResultRequest();
+
+ break;
+
+ case 77:
+ msg = new GridTaskResultResponse();
+
+ break;
+
+ case 78:
+ msg = new MissingMappingRequestMessage();
+
+ break;
+
+ case 79:
+ msg = new MissingMappingResponseMessage();
+
+ break;
+
+ case 80:
+ msg = new MetadataRequestMessage();
+
+ break;
+
+ case 81:
+ msg = new MetadataResponseMessage();
+
+ break;
+
+ case 82:
+ msg = new JobStealingRequest();
+
+ break;
+
+ case 84:
+ msg = new GridByteArrayList();
+
+ break;
+
+ case 85:
+ msg = new GridLongList();
+
+ break;
+
+ case 86:
+ msg = new GridCacheVersion();
+
+ break;
+
+ case 87:
+ msg = new GridDhtPartitionExchangeId();
+
+ break;
+
+ case 88:
+ msg = new GridCacheReturn();
+
+ break;
+
+ case 89:
+ msg = new CacheObjectImpl();
+
+ break;
+
+ case 90:
+ msg = new KeyCacheObjectImpl();
+
+ break;
+
+ case 91:
+ msg = new GridCacheEntryInfo();
+
+ break;
+
+ case 92:
+ msg = new CacheEntryInfoCollection();
+
+ break;
+
+ case 93:
+ msg = new CacheInvokeDirectResult();
+
+ break;
+
+ case 94:
+ msg = new IgniteTxKey();
+
+ break;
+
+ case 95:
+ msg = new DataStreamerEntry();
+
+ break;
+
+ case 96:
+ msg = new CacheContinuousQueryEntry();
+
+ break;
+
+ case 97:
+ msg = new CacheEvictionEntry();
+
+ break;
+
+ case 98:
+ msg = new CacheEntryPredicateContainsValue();
+
+ break;
+
+ case 99:
+ msg = new CacheEntrySerializablePredicate();
+
+ break;
+
+ case 100:
+ msg = new IgniteTxEntry();
+
+ break;
+
+ case 101:
+ msg = new TxEntryValueHolder();
+
+ break;
+
+ case 102:
+ msg = new CacheVersionedValue();
+
+ break;
+
+ case 103:
+ msg = new GridCacheRawVersionedEntry<>();
+
+ break;
+
+ case 104:
+ msg = new GridCacheVersionEx();
+
+ break;
+
+ case 105:
+ msg = new CacheObjectByteArrayImpl();
+
+ break;
+
+ case 106:
+ msg = new GridQueryCancelRequest();
+
+ break;
+
+ case 107:
+ msg = new GridQueryFailResponse();
+
+ break;
+
+ case 108:
+ msg = new GridQueryNextPageRequest();
+
+ break;
+
+ case 109:
+ msg = new GridQueryNextPageResponse();
+
+ break;
+
+ case 110:
+ // EMPTY type
+ // GridQueryRequest was removed
+ break;
+
+ case 111:
+ msg = new AffinityTopologyVersion();
+
+ break;
+
+ case 112:
+ msg = new GridCacheSqlQuery();
+
+ break;
+
+ case 113:
+ msg = new BinaryObjectImpl();
+
+ break;
+
+ case 114:
+ msg = new GridDhtPartitionSupplyMessage();
+
+ break;
+
+ case 115:
+ msg = new UUIDCollectionMessage();
+
+ break;
+
+ case 116:
+ msg = new GridNearSingleGetRequest();
+
+ break;
+
+ case 117:
+ msg = new GridNearSingleGetResponse();
+
+ break;
+
+ case 118:
+ msg = new CacheContinuousQueryBatchAck();
+
+ break;
+
+ case 119:
+ msg = new BinaryEnumObjectImpl();
+
+ break;
+
+ // [120..123] - DR
+ case 124:
+ msg = new GridMessageCollection<>();
+
+ break;
+
+ case 125:
+ msg = new GridNearAtomicSingleUpdateRequest();
+
+ break;
+
+ case 126:
+ msg = new GridNearAtomicSingleUpdateInvokeRequest();
+
+ break;
+
+ case 127:
+ msg = new GridNearAtomicSingleUpdateFilterRequest();
+
+ break;
+
+ case 128:
+ msg = new CacheGroupAffinityMessage();
+
+ break;
+
+ case 129:
+ msg = new WalStateAckMessage();
+
+ break;
+
+ case 130:
+ msg = new UserManagementOperationFinishedMessage();
+
+ break;
+
+ case 131:
+ msg = new UserAuthenticateRequestMessage();
+
+ break;
+
+ case 132:
+ msg = new UserAuthenticateResponseMessage();
+
+ break;
+
+ case 133:
+ msg = new ClusterMetricsUpdateMessage();
+
+ break;
+
+ case 134:
+ msg = new ContinuousRoutineStartResultMessage();
+
+ break;
+
+ case 135:
+ msg = new LatchAckMessage();
+
+ break;
+
+ case 136:
+ msg = new MvccTxSnapshotRequest();
+
+ break;
+
+ case 137:
+ msg = new MvccAckRequestTx();
+
+ break;
+
+ case 138:
+ msg = new MvccFutureResponse();
+
+ break;
+
+ case 139:
+ msg = new MvccQuerySnapshotRequest();
+
+ break;
+
+ case 140:
+ msg = new MvccAckRequestQueryCntr();
+
+ break;
+
+ case 141:
+ msg = new MvccSnapshotResponse();
+
+ break;
+
+ case 143:
+ msg = new GridCacheMvccEntryInfo();
+
+ break;
+
+ case 144:
+ msg = new GridDhtTxQueryEnlistResponse();
+
+ break;
+
+ case 145:
+ msg = new MvccAckRequestQueryId();
+
+ break;
+
+ case 146:
+ msg = new MvccAckRequestTxAndQueryCntr();
+
+ break;
+
+ case 147:
+ msg = new MvccAckRequestTxAndQueryId();
+
+ break;
+
+ case 148:
+ msg = new MvccVersionImpl();
+
+ break;
+
+ case 149:
+ msg = new MvccActiveQueriesMessage();
+
+ break;
+
+ case 150:
+ msg = new MvccSnapshotWithoutTxs();
+
+ break;
+
+ case 151:
+ msg = new GridNearTxQueryEnlistRequest();
+
+ break;
+
+ case 152:
+ msg = new GridNearTxQueryEnlistResponse();
+
+ break;
+
+ case 153:
+ msg = new GridNearTxQueryResultsEnlistRequest();
+
+ break;
+
+ case 154:
+ msg = new GridNearTxQueryResultsEnlistResponse();
+
+ break;
+
+ case 155:
+ msg = new GridDhtTxQueryEnlistRequest();
+
+ break;
+
+ case 156:
+ msg = new GridDhtTxQueryFirstEnlistRequest();
+
+ break;
+
+ case 157:
+ msg = new PartitionUpdateCountersMessage();
+
+ break;
+
+ case 158:
+ msg = new GridDhtPartitionSupplyMessageV2();
+
+ break;
+
+ case 159:
+ msg = new GridNearTxEnlistRequest();
+
+ break;
+
+ case 160:
+ msg = new GridNearTxEnlistResponse();
+
+ break;
+
+ case 161:
+ msg = new GridInvokeValue();
+
+ break;
+
+ case 162:
+ msg = new GenerateEncryptionKeyRequest();
+
+ break;
+
+ case 163:
+ msg = new GenerateEncryptionKeyResponse();
+
+ break;
+
+ case 164:
+ msg = new MvccRecoveryFinishedMessage();
+
+ break;
+
+ case 165:
+ msg = new PartitionCountersNeighborcastRequest();
+
+ break;
+
+ case 166:
+ msg = new PartitionCountersNeighborcastResponse();
+
+ break;
+
+ case 167:
+ msg = new ServiceDeploymentProcessId();
+
+ break;
+
+ case 168:
+ msg = new ServiceSingleNodeDeploymentResultBatch();
+
+ break;
+
+ case 169:
+ msg = new ServiceSingleNodeDeploymentResult();
+
+ break;
+
+ case 170:
+ msg = new DeadlockProbe();
+
+ break;
+
+ case 171:
+ msg = new ProbedTx();
+
+ break;
+
+ case GridQueryKillRequest.TYPE_CODE:
+ msg = new GridQueryKillRequest();
+
+ break;
+
+ case GridQueryKillResponse.TYPE_CODE:
+ msg = new GridQueryKillResponse();
+
+ break;
+
+ case GridIoSecurityAwareMessage.TYPE_CODE:
+ msg = new GridIoSecurityAwareMessage();
+
+ break;
+
+ case SessionChannelMessage.TYPE_CODE:
+ msg = new SessionChannelMessage();
+
+ break;
+
+ case SingleNodeMessage.TYPE_CODE:
+ msg = new SingleNodeMessage<>();
+
+ break;
+
+ case 177:
+ msg = new TcpInverseConnectionResponseMessage();
+
+ break;
+
+ // [-3..119] [124..129] [-23..-28] [-36..-55] - this
+ // [120..123] - DR
+ // [-4..-22, -30..-35] - SQL
+ // [2048..2053] - Snapshots
+ default:
+ if (ext != null) {
+ for (MessageFactory factory : ext) {
+ msg = factory.create(type);
+
+ if (msg != null)
+ break;
+ }
+ }
+
+ if (msg == null) {
+ IgniteOutClosure<Message> c = CUSTOM.get(type);
+
+ if (c != null)
+ msg = c.apply();
+ }
+ }
+
+ if (msg == null)
+ throw new IgniteException("Invalid message type: " + type);
+
+ return msg;
+ }
+
+ /**
+ * Registers factory for custom message. Used for test purposes.
+ *
+ * @param type Message type.
+ * @param c Message producer.
+ */
+ public static void registerCustom(short type, IgniteOutClosure<Message> c) {
+ assert c != null;
+
+ CUSTOM.put(type, c);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
deleted file mode 100644
index eb89043..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
-
-/**
- * Message factory implementation which is responsible for instantiation of all communication messages.
- */
-public class IgniteMessageFactoryImpl implements IgniteMessageFactory {
- /** Offset. */
- private static final int OFF = -Short.MIN_VALUE;
-
- /** Array size. */
- private static final int ARR_SIZE = 1 << Short.SIZE;
-
- /** Custom messages registry. Used for test purposes. */
- private static final Map<Short, Supplier<Message>> CUSTOM = new ConcurrentHashMap<>();
-
- /** Message suppliers. */
- private final Supplier<Message>[] msgSuppliers = (Supplier<Message>[]) Array.newInstance(Supplier.class, ARR_SIZE);
-
- /** Initialized flag. If {@code true} then new message type couldn't be registered. */
- private boolean initialized;
-
- /**
- * Contructor.
- *
- * @param factories Concrete message factories or message factory providers. Cfn't be empty or {@code null}.
- */
- public IgniteMessageFactoryImpl(MessageFactory[] factories) {
- if (factories == null || factories.length == 0)
- throw new IllegalArgumentException("Message factory couldn't be initialized. Factories aren't provided.");
-
- List<MessageFactory> old = new ArrayList<>(factories.length);
-
- for (MessageFactory factory : factories) {
- if (factory instanceof MessageFactoryProvider) {
- MessageFactoryProvider p = (MessageFactoryProvider)factory;
-
- p.registerAll(this);
- }
- else
- old.add(factory);
- }
-
- if (!old.isEmpty()) {
- for (int i = 0; i < ARR_SIZE; i++) {
- Supplier<Message> curr = msgSuppliers[i];
-
- if (curr == null) {
- short directType = indexToDirectType(i);
-
- for (MessageFactory factory : old) {
- Message msg = factory.create(directType);
-
- if (msg != null)
- register(directType, () -> factory.create(directType));
- }
- }
- }
- }
-
- initialized = true;
- }
-
- /** {@inheritDoc} */
- @Override public void register(short directType, Supplier<Message> supplier) throws IgniteException {
- if (initialized) {
- throw new IllegalStateException("Message factory is already initialized. " +
- "Registration of new message types is forbidden.");
- }
-
- int idx = directTypeToIndex(directType);
-
- Supplier<Message> curr = msgSuppliers[idx];
-
- if (curr == null)
- msgSuppliers[idx] = supplier;
- else
- throw new IgniteException("Message factory is already registered for direct type: " + directType);
- }
-
- /**
- * Creates new message instance of provided direct type.
- * <p>
- *
- * @param directType Message direct type.
- * @return Message instance.
- * @throws IgniteException If there are no any message factory for given {@code directType}.
- */
- @Override public @Nullable Message create(short directType) {
- Supplier<Message> supplier = msgSuppliers[directTypeToIndex(directType)];
-
- if (supplier == null)
- supplier = CUSTOM.get(directType);
-
- if (supplier == null)
- throw new IgniteException("Invalid message type: " + directType);
-
- return supplier.get();
- }
-
- /**
- * @param directType Direct type.
- */
- private static int directTypeToIndex(short directType) {
- return directType + OFF;
- }
-
- /**
- * @param idx Index.
- */
- private static short indexToDirectType(int idx) {
- int res = idx - OFF;
-
- assert res >= Short.MIN_VALUE && res <= Short.MAX_VALUE;
-
- return (short)res;
- }
-
- /**
- * Registers factory for custom message. Used for test purposes.
- *
- * @param type Message type.
- * @param c Message producer.
- *
- * @deprecated Should be removed. Please don't use this method anymore.
- * Consider using of plugin with own message types.
- */
- @TestOnly
- @Deprecated
- public static void registerCustom(short type, Supplier<Message> c) {
- assert c != null;
-
- CUSTOM.put(type, c);
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
deleted file mode 100644
index ae159b3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IgniteMessageFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import java.util.function.Supplier;
-import org.apache.ignite.IgniteException;
-
-/**
- * Message factory for all communication messages registered using {@link #register(short, Supplier)} method call.
- */
-public interface IgniteMessageFactory extends MessageFactory {
- /**
- * Register message factory with given direct type. All messages must be registered during construction
- * of class which implements this interface. Any invocation of this method after initialization is done must
- * throw {@link IllegalStateException} exception.
- *
- * @param directType Direct type.
- * @param supplier Message factory.
- * @throws IgniteException In case of attempt to register message with direct type which is already registered.
- * @throws IllegalStateException On any invocation of this method when class which implements this interface
- * is alredy constructed.
- */
- public void register(short directType, Supplier<Message> supplier) throws IgniteException;
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
index 07a3d5b..1ea88fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -26,10 +26,7 @@ import org.jetbrains.annotations.Nullable;
* A plugin can provide his own message factory as an extension
* if it uses any custom messages (all message must extend
* {@link Message} class).
- *
- * @deprecated Use {@link MessageFactoryProvider} instead.
*/
-@Deprecated
public interface MessageFactory extends Extension {
/**
* Creates new message instance of provided type.
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
deleted file mode 100644
index 910c68c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactoryProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Provider of communication message factories.
- * <p>
- * Implementation of this interface is responsible for registration of all message factories in
- * {@link #registerAll} method.
- * <p>
- * {@link #registerAll} method's call is responsibility of {@link IgniteMessageFactory} implementation.
- */
-public interface MessageFactoryProvider extends MessageFactory {
- /**
- * Registers all messages factories. See {@link IgniteMessageFactory#register}.
- *
- * @param factory {@link IgniteMessageFactory} implementation.
- */
- public void registerAll(IgniteMessageFactory factory);
-
- /**
- * Always throws {@link UnsupportedOperationException}.
- * @param type Message direct type.
- * @throws UnsupportedOperationException On any invocation.
- */
- @Override @Nullable public default Message create(short type) {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
index afece1f..361409b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -109,7 +109,7 @@ class TcpCommunicationMetricsListener {
private final Object msgTypMapMux = new Object();
/** Message type map. */
- private volatile Map<Short, String> msgTypeMap;
+ private volatile Map<Short, String> msgTypMap;
/** */
public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
@@ -285,7 +285,7 @@ class TcpCommunicationMetricsListener {
if (metric.name().startsWith(prefix)) {
short directType = Short.parseShort(metric.name().substring(prefix.length()));
- Map<Short, String> msgTypMap0 = msgTypeMap;
+ Map<Short, String> msgTypMap0 = msgTypMap;
if (msgTypMap0 != null) {
String typeName = msgTypMap0.get(directType);
@@ -374,24 +374,24 @@ class TcpCommunicationMetricsListener {
private void updateMessageTypeMap(Message msg) {
short typeId = msg.directType();
- Map<Short, String> msgTypMap0 = msgTypeMap;
+ Map<Short, String> msgTypMap0 = msgTypMap;
if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
synchronized (msgTypMapMux) {
- if (msgTypeMap == null) {
+ if (msgTypMap == null) {
msgTypMap0 = new HashMap<>();
msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypMap0;
+ msgTypMap = msgTypMap0;
}
else {
- if (!msgTypeMap.containsKey(typeId)) {
- msgTypMap0 = new HashMap<>(msgTypeMap);
+ if (!msgTypMap.containsKey(typeId)) {
+ msgTypMap0 = new HashMap<>(msgTypMap);
msgTypMap0.put(typeId, msg.getClass().getName());
- msgTypeMap = msgTypMap0;
+ msgTypMap = msgTypMap0;
}
}
}
diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
index 7704c0b..6bca88f 100644
--- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
+++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider
@@ -2,4 +2,3 @@ org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteSta
org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider
org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin
org.apache.ignite.plugin.NodeValidationPluginProvider
-
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
index 6dd103e..108e4672 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/GridManagerLocalMessageListenerSelfTest.java
@@ -22,9 +22,11 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiContext;
@@ -46,7 +48,11 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
private static final short DIRECT_TYPE = 210;
static {
- IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, GridIoUserMessage::new);
+ GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridIoUserMessage();
+ }
+ });
}
/** {@inheritDoc} */
@@ -174,7 +180,7 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
private IgniteSpiContext spiCtx;
/** Test message topic. **/
- private static final String TEST_TOPIC = "test_topic";
+ private String TEST_TOPIC = "test_topic";
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String igniteInstanceName) throws IgniteSpiException {
@@ -186,7 +192,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
// No-op.
}
- /** {@inheritDoc} */
@Override public void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException {
this.spiCtx = spiCtx;
@@ -198,7 +203,6 @@ public class GridManagerLocalMessageListenerSelfTest extends GridCommonAbstractT
}
- /** {@inheritDoc} */
@Override public void onContextDestroyed0() {
spiCtx.removeLocalMessageListener(TEST_TOPIC, new IgniteBiPredicate<UUID, Object>() {
@Override public boolean apply(UUID uuid, Object o) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index 0a75cf5..8a27a46 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -45,10 +46,20 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
/** */
private static final short DIRECT_TYPE_OVER_BYTE = 1000;
- static {
- IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE, TestMessage::new);
+ /** */
+ private int bufSize;
- IgniteMessageFactoryImpl.registerCustom(DIRECT_TYPE_OVER_BYTE, TestOverByteIdMessage::new);
+ static {
+ GridIoMessageFactory.registerCustom(DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestMessage();
+ }
+ });
+ GridIoMessageFactory.registerCustom(DIRECT_TYPE_OVER_BYTE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestOverByteIdMessage();
+ }
+ });
}
/** {@inheritDoc} */
@@ -97,6 +108,8 @@ public class GridCommunicationSendMessageSelfTest extends GridCommonAbstractTest
*/
@Test
public void testSendMessageWithBuffer() throws Exception {
+ bufSize = 8192;
+
try {
startGridsMultiThreaded(2);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
deleted file mode 100644
index c55cc0d..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for default implementation of {@link IgniteMessageFactory} interface.
- */
-public class IgniteMessageFactoryImplTest {
- /** Test message 1 type. */
- private static final short TEST_MSG_1_TYPE = 1;
-
- /** Test message 2 type. */
- private static final short TEST_MSG_2_TYPE = 2;
-
- /** Unknown message type. */
- private static final short UNKNOWN_MSG_TYPE = 0;
-
- /**
- * Tests that impossible register new message after initialization.
- */
- @Test(expected = IllegalStateException.class)
- public void testReadOnly() {
- MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
- IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
- msgFactory.register((short)0, () -> null);
- }
-
- /**
- * Tests that proper message type will be returned by message factory.
- */
- @Test
- public void testCreate() {
- MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
- IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
- Message msg;
-
- msg = msgFactory.create(TEST_MSG_1_TYPE);
- assertTrue(msg instanceof TestMessage1);
-
- msg = msgFactory.create(TEST_MSG_2_TYPE);
- assertTrue(msg instanceof TestMessage2);
-
- msg = msgFactory.create(TEST_MSG_2_TYPE);
- assertTrue(msg instanceof TestMessage2);
- }
-
- /**
- * Tests that exception will be thrown for unknown message direct type.
- */
- @Test(expected = IgniteException.class)
- public void testCreate_UnknownMessageType() {
- MessageFactory[] factories = {new TestMessageFactoryPovider(), new TestMessageFactory()};
-
- IgniteMessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
-
- msgFactory.create(UNKNOWN_MSG_TYPE);
- }
-
- /**
- * Tests attemption of registration message with already registered message type.
- */
- @Test(expected = IgniteException.class)
- @SuppressWarnings("ResultOfObjectAllocationIgnored")
- public void testRegisterTheSameType() {
- MessageFactory[] factories = {
- new TestMessageFactoryPovider(),
- new TestMessageFactory(),
- new TestMessageFactoryPoviderWithTheSameDirectType()
- };
-
- new IgniteMessageFactoryImpl(factories);
- }
-
- /**
- * {@link MessageFactoryProvider} implementation.
- */
- private static class TestMessageFactoryPovider implements MessageFactoryProvider {
- /** {@inheritDoc} */
- @Override public void registerAll(IgniteMessageFactory factory) {
- factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
- }
- }
-
- /**
- * {@link MessageFactoryProvider} implementation with message direct type which is already registered.
- */
- private static class TestMessageFactoryPoviderWithTheSameDirectType implements MessageFactoryProvider {
- /** {@inheritDoc} */
- @Override public void registerAll(IgniteMessageFactory factory) {
- factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
- }
- }
-
- /**
- * {@link MessageFactory} implementation whish still uses creation with switch-case.
- */
- private static class TestMessageFactory implements MessageFactory {
- /** {@inheritDoc} */
- @Override public @Nullable Message create(short type) {
- switch (type) {
- case TEST_MSG_2_TYPE:
- return new TestMessage2();
-
- default:
- return null;
- }
- }
- }
-
- /** Test message. */
- private static class TestMessage1 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
- }
-
- /** Test message. */
- private static class TestMessage2 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 2;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
deleted file mode 100644
index 6046f4a..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.managers.communication;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-import java.util.UUID;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.plugin.CachePluginContext;
-import org.apache.ignite.plugin.CachePluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.IgnitePlugin;
-import org.apache.ignite.plugin.PluginConfiguration;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.PluginProvider;
-import org.apache.ignite.plugin.PluginValidationException;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-import org.junit.Test;
-
-import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
-
-/**
- * Tests that node will not start if some component tries to register message factory with direct type
- * for which message factory is already registered.
- */
-public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
- /** Test plugin name. */
- private static final String TEST_PLUGIN_NAME = "TEST_PLUGIN";
-
- /** Message direct type. Message with this direct type will be registered by {@link GridIoMessageFactory} first. */
- private static final short MSG_DIRECT_TYPE = -44;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
-
- cfg.setPluginProviders(new TestPluginProvider());
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- super.afterTest();
-
- stopAllGrids();
- }
-
- /**
- * Tests that node will not start if some component tries to register message factory with direct type
- * for which message factory is already registered.
- */
- @Test
- public void testRegisterMessageFactoryWithConflictDirectTypeId() throws Exception {
- assertThrows(log, (Callable<Object>)this::startGrid, IgniteCheckedException.class,
- "Message factory is already registered for direct type: " + MSG_DIRECT_TYPE);
- }
-
- /** Plugin with own message factory. */
- private static class TestPlugin implements IgnitePlugin {
- }
-
- /** */
- public static class TestPluginProvider implements PluginProvider<TestPluginConfiguration> {
- /** {@inheritDoc} */
- @Override public String name() {
- return TEST_PLUGIN_NAME;
- }
-
- /** {@inheritDoc} */
- @Override public <T extends IgnitePlugin> T plugin() {
- return (T)new TestPlugin();
- }
-
- /** {@inheritDoc} */
- @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String version() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String copyright() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) throws IgniteCheckedException {
- registry.registerExtension(MessageFactory.class, new MessageFactoryProvider() {
- @Override public void registerAll(IgniteMessageFactory factory) {
- factory.register(MSG_DIRECT_TYPE, TestMessage::new);
- }
- });
- }
-
- /** {@inheritDoc} */
- @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void start(PluginContext ctx) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onIgniteStart() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onIgniteStop(boolean cancel) {
- // no-op.
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void validateNewNode(ClusterNode node, Serializable data) {
- // No-op.
- }
- }
-
- /** */
- private static class TestPluginConfiguration implements PluginConfiguration {
- }
-
- /** Test message with already registered direct type. */
- private static class TestMessage implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return MSG_DIRECT_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
- }
-
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index aaa42b1..12490d4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,8 +22,10 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -40,7 +42,11 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+ GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestMessage();
+ }
+ });
}
/** {@inheritDoc} */
@@ -56,8 +62,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
* @return Cache configuration.
* @throws Exception In case of error.
*/
- protected CacheConfiguration<?, ?> cacheConfiguration() throws Exception {
- CacheConfiguration<?, ?> cfg = defaultCacheConfiguration();
+ protected CacheConfiguration cacheConfiguration() throws Exception {
+ CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setCacheMode(PARTITIONED);
cfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -107,7 +113,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*/
@Test
public void testAddedDeploymentInfo() throws Exception {
- GridCacheContext<?, ?> ctx = cacheContext();
+ GridCacheContext ctx = cacheContext();
if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
assertFalse(ctx.deploymentEnabled());
@@ -131,7 +137,7 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
*/
@Test
public void testAddedDeploymentInfo2() throws Exception {
- GridCacheContext<?, ?> ctx = cacheContext();
+ GridCacheContext ctx = cacheContext();
if (grid(0).configuration().getMarshaller() instanceof BinaryMarshaller)
assertFalse(ctx.deploymentEnabled());
@@ -155,8 +161,8 @@ public class GridCacheConditionalDeploymentSelfTest extends GridCommonAbstractTe
/**
* @return Cache context.
*/
- protected GridCacheContext<?, ?> cacheContext() {
- return ((IgniteCacheProxy<?, ?>)grid(0).cache(DEFAULT_CACHE_NAME)).context();
+ protected GridCacheContext cacheContext() {
+ return ((IgniteCacheProxy)grid(0).cache(DEFAULT_CACHE_NAME)).context();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index 49d588f..aeaabda 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -33,11 +33,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -62,7 +60,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
- CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+ CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(PARTITIONED);
ccfg.setAtomicityMode(atomicityMode());
ccfg.setWriteSynchronizationMode(FULL_SYNC);
@@ -155,9 +153,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
e0.writeTo(buf, writer);
CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
- IgniteMessageFactoryImpl msgFactory =
- new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
- e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(msgFactory, (byte)1));
+ e1.readFrom(ByteBuffer.wrap(buf.array()), new DirectMessageReader(new GridIoMessageFactory(null), (byte)1));
assertEquals(e0.cacheId(), e1.cacheId());
assertEquals(e0.eventType(), e1.eventType());
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
index 326b716..8034093 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java
@@ -29,8 +29,9 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
@@ -51,7 +52,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
* Super class for all communication self tests.
* @param <T> Type of communication SPI.
*/
-public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
/** */
private static long msgId = 1;
@@ -74,13 +75,17 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
private static GridTimeoutProcessor timeoutProcessor;
/** */
- protected boolean useSsl;
+ protected boolean useSsl = false;
/**
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/** */
@@ -157,7 +162,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
for (ClusterNode node : nodes) {
synchronized (mux) {
if (!msgDestMap.containsKey(entry.getKey()))
- msgDestMap.put(entry.getKey(), new HashSet<>());
+ msgDestMap.put(entry.getKey(), new HashSet<UUID>());
msgDestMap.get(entry.getKey()).add(node.id());
}
@@ -203,7 +208,7 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS
for (ClusterNode node : nodes) {
synchronized (mux) {
if (!msgDestMap.containsKey(sndId))
- msgDestMap.put(sndId, new HashSet<>());
+ msgDestMap.put(sndId, new HashSet<UUID>());
msgDestMap.get(sndId).add(node.id());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
index 21a846d..797328e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridCacheMessageSelfTest.java
@@ -31,13 +31,14 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -65,15 +66,35 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(TestMessage.DIRECT_TYPE, TestMessage::new);
+ GridIoMessageFactory.registerCustom(TestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestMessage();
+ }
+ });
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
- IgniteMessageFactoryImpl.registerCustom(TestMessage1.DIRECT_TYPE, TestMessage1::new);
+ GridIoMessageFactory.registerCustom(TestMessage1.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestMessage1();
+ }
+ });
- IgniteMessageFactoryImpl.registerCustom(TestMessage2.DIRECT_TYPE, TestMessage2::new);
+ GridIoMessageFactory.registerCustom(TestMessage2.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestMessage2();
+ }
+ });
- IgniteMessageFactoryImpl.registerCustom(TestBadMessage.DIRECT_TYPE, TestBadMessage::new);
+ GridIoMessageFactory.registerCustom(TestBadMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new TestBadMessage();
+ }
+ });
}
/** {@inheritDoc} */
@@ -84,7 +105,7 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
cfg.setFailureHandler(new TestFailureHandler());
- CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+ CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
@@ -125,25 +146,25 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
try {
startGrids(2);
- IgniteEx ignite0 = grid(0);
- IgniteEx ignite1 = grid(1);
+ Ignite ignite0 = grid(0);
+ Ignite ignite1 = grid(1);
- ignite0.context().cache().context().io().addCacheHandler(
+ ((IgniteKernal)ignite0).context().cache().context().io().addCacheHandler(
0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
throw new RuntimeException("Test bad message exception");
}
});
- ignite1.context().cache().context().io().addCacheHandler(
+ ((IgniteKernal)ignite1).context().cache().context().io().addCacheHandler(
0, TestBadMessage.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
throw new RuntimeException("Test bad message exception");
}
});
- ignite0.context().cache().context().io().send(
- ignite1.localNode().id(), new TestBadMessage(), (byte)2);
+ ((IgniteKernal)ignite0).context().cache().context().io().send(
+ ((IgniteKernal)ignite1).localNode().id(), new TestBadMessage(), (byte)2);
boolean res = failureLatch.await(5, TimeUnit.SECONDS);
@@ -158,8 +179,8 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void doSend() throws Exception {
- GridIoManager mgr0 = grid(0).context().io();
- GridIoManager mgr1 = grid(1).context().io();
+ GridIoManager mgr0 = ((IgniteKernal)grid(0)).context().io();
+ GridIoManager mgr1 = ((IgniteKernal)grid(1)).context().io();
String topic = "test-topic";
@@ -174,14 +195,14 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
assertEquals(10, messages.size());
- int cnt = 0;
+ int count = 0;
for (TestMessage1 msg1 : messages) {
assertTrue(msg1.body().contains(TEST_BODY));
int i = Integer.parseInt(msg1.body().substring(TEST_BODY.length() + 1));
- assertEquals(cnt, i);
+ assertEquals(count, i);
TestMessage2 msg2 = (TestMessage2) msg1.message();
@@ -193,11 +214,11 @@ public class GridCacheMessageSelfTest extends GridCommonAbstractTest {
GridTestMessage msg3 = (GridTestMessage) msg2.message();
- assertEquals(cnt, msg3.getMsgId());
+ assertEquals(count, msg3.getMsgId());
assertEquals(grid(1).localNode().id(), msg3.getSourceNodeId());
- cnt++;
+ count++;
}
}
catch (Exception e) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 54e5386..7940a63 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
/**
* Test for {@link TcpCommunicationSpi}
*/
-abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi<Message>> {
+abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunicationSelfTest<CommunicationSpi> {
/** */
private static final int SPI_COUNT = 3;
@@ -59,7 +59,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}
/** {@inheritDoc} */
- @Override protected CommunicationSpi<Message> getSpi(int idx) {
+ @Override protected CommunicationSpi getSpi(int idx) {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
if (!useShmem)
@@ -88,7 +88,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
super.testSendToManyNodes();
// Test idle clients remove.
- for (CommunicationSpi<Message> spi : spis.values()) {
+ for (CommunicationSpi spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(getSpiCount() - 1, clients.size());
@@ -129,13 +129,13 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
final CyclicBarrier b = new CyclicBarrier(THREADS);
- List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+ List<IgniteInternalFuture> futs = new ArrayList<>();
for (Map.Entry<UUID, CommunicationSpi<Message>> entry : spis.entrySet()) {
final TcpCommunicationSpi spi = (TcpCommunicationSpi)entry.getValue();
- futs.add(GridTestUtils.runAsync(new Callable<Void>() {
- @Override public Void call() throws Exception {
+ futs.add(GridTestUtils.runAsync(new Callable() {
+ @Override public Object call() throws Exception {
List<ClusterNode> checkNodes = new ArrayList<>(nodes);
assert checkNodes.size() > 1;
@@ -156,7 +156,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
}));
}
- for (IgniteInternalFuture<?> f : futs)
+ for (IgniteInternalFuture f : futs)
f.get();
}
@@ -164,7 +164,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
@Override protected void afterTest() throws Exception {
super.afterTest();
- for (CommunicationSpi<Message> spi : spis.values()) {
+ for (CommunicationSpi spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
for (int i = 0; i < 20; i++) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 69e4bef..7ab0d6f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -37,12 +37,13 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -64,7 +65,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi<Message>>
+public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends CommunicationSpi>
extends GridSpiAbstractTest<T> {
/** */
private static final int SPI_CNT = 2;
@@ -100,7 +101,11 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
@@ -351,24 +356,24 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
assertTrue(latch.await(10, TimeUnit.SECONDS));
- for (CommunicationSpi<?> spi : spis) {
+ for (CommunicationSpi spi : spis) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(1, clients.size());
- final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ final GridNioServer srv = U.field(spi, "nioSrvr");
final int conns = pairedConnections ? 2 : 1;
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
- Collection<?> sessions = U.field(srv, "sessions");
+ Collection sessions = U.field(srv, "sessions");
return sessions.size() == conns * connectionsPerNode;
}
}, 5000);
- Collection<?> sessions = U.field(srv, "sessions");
+ Collection sessions = U.field(srv, "sessions");
assertEquals(conns * connectionsPerNode, sessions.size());
}
@@ -391,7 +396,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
/**
* @return SPI.
*/
- private CommunicationSpi<Message> createSpi() {
+ private CommunicationSpi createSpi() {
TcpCommunicationSpi spi = new TcpCommunicationSpi();
spi.setLocalAddress("127.0.0.1");
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index f99df2b..a53b43b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -38,13 +38,14 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,7 +100,11 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
private static boolean reject;
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index d99f48f..408eb10 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -27,13 +27,14 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -55,7 +56,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -75,7 +76,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
@@ -168,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
final long totAcked0 = totAcked;
for (TcpCommunicationSpi spi : spis) {
- GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ GridNioServer srv = U.field(spi, "nioSrvr");
final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
@@ -276,7 +281,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
ClusterNode node0 = nodes.get(0);
ClusterNode node1 = nodes.get(1);
- final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
+ final GridNioServer srv1 = U.field(spi1, "nioSrvr");
int msgId = 0;
@@ -336,7 +341,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
- final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ final GridNioServer srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 1d03590..5ec734a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -32,12 +32,13 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -60,7 +61,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<Message>> extends GridSpiAbstractTest<T> {
+public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -89,7 +90,11 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
@@ -669,7 +674,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi<
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi, boolean in) throws Exception {
- final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ final GridNioServer srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index ef9b413..d937bb0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
@@ -58,7 +59,7 @@ import org.junit.Test;
*
*/
@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
-public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi<Message>>
+public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
extends GridSpiAbstractTest<T> {
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
@@ -79,7 +80,11 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
*
*/
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
@@ -90,7 +95,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
}
/** */
- private static class TestListener implements CommunicationListener<Message> {
+ private class TestListener implements CommunicationListener<Message> {
/** */
private GridConcurrentHashSet<Long> msgIds = new GridConcurrentHashSet<>();
@@ -189,7 +194,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
final long totAcked0 = totAcked;
for (TcpCommunicationSpi spi : spis) {
- GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ GridNioServer srv = U.field(spi, "nioSrvr");
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
@@ -301,7 +306,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
// Check that session will not be closed by idle timeout because expected close by queue overflow.
assertTrue(spi0.getIdleConnectionTimeout() > awaitTime);
- final GridNioServer<?> srv1 = U.field(spi1, "nioSrvr");
+ final GridNioServer srv1 = U.field(spi1, "nioSrvr");
// For prevent session close by write timeout.
srv1.writeTimeout(60_000);
@@ -387,7 +392,7 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
* @throws Exception If failed.
*/
private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception {
- final GridNioServer<?> srv = U.field(spi, "nioSrvr");
+ final GridNioServer srv = U.field(spi, "nioSrvr");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
index 26fb56b..99840c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -17,21 +17,28 @@
package org.apache.ignite.spi.communication.tcp;
+import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
@@ -56,7 +63,11 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
private final CountDownLatch latch = new CountDownLatch(1);
static {
- IgniteMessageFactoryImpl.registerCustom(GridTestMessage.DIRECT_TYPE, GridTestMessage::new);
+ GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() {
+ @Override public Message apply() {
+ return new GridTestMessage();
+ }
+ });
}
/**
@@ -106,9 +117,19 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
* @param nodeIdx Node index.
* @return MBean instance.
*/
- private TcpCommunicationSpiMBean mbean(int nodeIdx) {
- return getMxBean(getTestIgniteInstanceName(nodeIdx), "SPIs",
- SynchronizedCommunicationSpi.class, TcpCommunicationSpiMBean.class);
+ private TcpCommunicationSpiMBean mbean(int nodeIdx) throws MalformedObjectNameException {
+ ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
+ SynchronizedCommunicationSpi.class.getSimpleName());
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ if (mbeanServer.isRegistered(mbeanName))
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName, TcpCommunicationSpiMBean.class,
+ true);
+ else
+ fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+ return null;
}
/**
@@ -138,10 +159,10 @@ public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
latch.await(10, TimeUnit.SECONDS);
- ClusterGroup clusterGrpNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+ ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
// Send job from node0 to node1.
- grid(0).compute(clusterGrpNode1).call(new IgniteCallable<Boolean>() {
+ grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
@Override public Boolean call() throws Exception {
return Boolean.TRUE;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 6424bfc..da43ded 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -568,7 +567,7 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** {@inheritDoc} */
@Override public MessageFactory messageFactory() {
if (factory == null)
- factory = new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+ factory = new GridIoMessageFactory(null);
return factory;
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 5876bee..3374928 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -47,9 +47,7 @@ import org.apache.ignite.internal.managers.communication.IgniteCommunicationBala
import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteCommunicationSslBalanceTest;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImplTest;
import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
-import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConflictTest;
import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
@@ -353,9 +351,6 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationBalanceMultipleConnectionsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCommunicationSslBalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, IgniteIoTestMessagesTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, IgniteMessageFactoryImplTest.class, ignoredTests);
- GridTestUtils.addTestIfNeeded(suite, MessageDirectTypeIdConflictTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteIncompleteCacheObjectSelfTest.class, ignoredTests);
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index 0720b4a..ce04839 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -21,9 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -124,8 +122,7 @@ public class GridMessageCollectionTest {
assertEquals(m.directType(), type);
- IgniteMessageFactory msgFactory =
- new IgniteMessageFactoryImpl(new MessageFactory[]{new GridIoMessageFactory()});
+ GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
Message mx = msgFactory.create(type);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index e4aae7d..0ff53f7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -23,52 +23,107 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
-import org.apache.ignite.plugin.extensions.communication.IgniteMessageFactory;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
/**
* H2 Value message factory.
*/
-public class GridH2ValueMessageFactory implements MessageFactoryProvider {
+public class GridH2ValueMessageFactory implements MessageFactory {
/** {@inheritDoc} */
- @Override public void registerAll(IgniteMessageFactory factory) {
- factory.register((short)-4, () -> GridH2Null.INSTANCE);
- factory.register((short)-5, GridH2Boolean::new);
- factory.register((short)-6, GridH2Byte::new);
- factory.register((short)-7, GridH2Short::new);
- factory.register((short)-8, GridH2Integer::new);
- factory.register((short)-9, GridH2Long::new);
- factory.register((short)-10, GridH2Decimal::new);
- factory.register((short)-11, GridH2Double::new);
- factory.register((short)-12, GridH2Float::new);
- factory.register((short)-13, GridH2Time::new);
- factory.register((short)-14, GridH2Date::new);
- factory.register((short)-15, GridH2Timestamp::new);
- factory.register((short)-16, GridH2Bytes::new);
- factory.register((short)-17, GridH2String::new);
- factory.register((short)-18, GridH2Array::new);
- factory.register((short)-19, GridH2JavaObject::new);
- factory.register((short)-20, GridH2Uuid::new);
- factory.register((short)-21, GridH2Geometry::new);
- factory.register((short)-22, GridH2CacheObject::new);
- factory.register((short)-30, GridH2IndexRangeRequest::new);
- factory.register((short)-31, GridH2IndexRangeResponse::new);
- factory.register((short)-32, GridH2RowMessage::new);
- factory.register((short)-33, GridH2QueryRequest::new);
- factory.register((short)-34, GridH2RowRange::new);
- factory.register((short)-35, GridH2RowRangeBounds::new);
- factory.register((short)-54, QueryTable::new);
- factory.register((short)-55, GridH2DmlRequest::new);
- factory.register((short)-56, GridH2DmlResponse::new);
- factory.register((short)-57, GridH2SelectForUpdateTxDetails::new);
- }
+ @Nullable @Override public Message create(short type) {
+ switch (type) {
+ case -4:
+ return GridH2Null.INSTANCE;
- /** {@inheritDoc} */
- @Override @Nullable public Message create(short type) {
- throw new UnsupportedOperationException();
+ case -5:
+ return new GridH2Boolean();
+
+ case -6:
+ return new GridH2Byte();
+
+ case -7:
+ return new GridH2Short();
+
+ case -8:
+ return new GridH2Integer();
+
+ case -9:
+ return new GridH2Long();
+
+ case -10:
+ return new GridH2Decimal();
+
+ case -11:
+ return new GridH2Double();
+
+ case -12:
+ return new GridH2Float();
+
+ case -13:
+ return new GridH2Time();
+
+ case -14:
+ return new GridH2Date();
+
+ case -15:
+ return new GridH2Timestamp();
+
+ case -16:
+ return new GridH2Bytes();
+
+ case -17:
+ return new GridH2String();
+
+ case -18:
+ return new GridH2Array();
+
+ case -19:
+ return new GridH2JavaObject();
+
+ case -20:
+ return new GridH2Uuid();
+
+ case -21:
+ return new GridH2Geometry();
+
+ case -22:
+ return new GridH2CacheObject();
+
+ case -30:
+ return new GridH2IndexRangeRequest();
+
+ case -31:
+ return new GridH2IndexRangeResponse();
+
+ case -32:
+ return new GridH2RowMessage();
+
+ case -33:
+ return new GridH2QueryRequest();
+
+ case -34:
+ return new GridH2RowRange();
+
+ case -35:
+ return new GridH2RowRangeBounds();
+
+ case -54:
+ return new QueryTable();
+
+ case -55:
+ return new GridH2DmlRequest();
+
+ case -56:
+ return new GridH2DmlResponse();
+
+ case -57:
+ return new GridH2SelectForUpdateTxDetails();
+ }
+
+ return null;
}
/**