You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/07/17 09:09:46 UTC
[activemq-nms-amqp] branch master updated: AMQNET-589: Failover
implementation
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/master by this push:
new e9d8fd2 AMQNET-589: Failover implementation
new 2d565c7 Merge pull request #4 from cityindex/failover_implementation
e9d8fd2 is described below
commit e9d8fd2264d22a6531fe757e7f6f0fa68190cd71
Author: Havret <h4...@gmail.com>
AuthorDate: Sat Jun 1 09:21:41 2019 +0200
AMQNET-589: Failover implementation
---
.gitignore | 1 +
apache-nms-amqp.sln | 6 +-
src/HelloWorld/HelloWorld.cs | 35 +-
src/HelloWorld/HelloWorld.csproj | 1 +
src/NMS.AMQP/Apache-NMS-AMQP.csproj | 2 +
src/NMS.AMQP/Connection.cs | 1160 -----------------
src/NMS.AMQP/ConnectionFactory.cs | 439 +------
src/NMS.AMQP/ConnectionMetaData.cs | 2 +-
src/NMS.AMQP/Destination.cs | 341 -----
...esMessageCloak.cs => INmsConnectionListener.cs} | 25 +-
src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs | 217 ----
src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs | 163 ---
src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs | 216 ----
src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs | 650 ----------
.../Message/AMQP/AMQPMessageTransformation.cs | 79 --
.../Message/AMQP/AMQPObjectMessageCloak.cs | 413 ------
.../Message/AMQP/AMQPStreamMessageCloak.cs | 211 ---
src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs | 141 --
.../{Cloak/ITextMessageCloak.cs => AckType.cs} | 22 +-
.../INmsBytesMessageFacade.cs} | 22 +-
.../INmsMapMessageFacade.cs} | 17 +-
.../INmsMessageFacade.cs} | 58 +-
.../INmsObjectMessageFacade.cs} | 16 +-
.../INmsStreamMessageFacade.cs} | 24 +-
.../INmsTextMessageFacade.cs} | 12 +-
src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs | 137 --
src/NMS.AMQP/Message/Factory/IMessageFactory.cs | 75 --
src/NMS.AMQP/Message/Factory/MessageFactory.cs | 92 --
...StreamMessageCloak.cs => INmsMessageFactory.cs} | 34 +-
...esMessageCloak.cs => InboundMessageDispatch.cs} | 26 +-
src/NMS.AMQP/Message/MapMessage.cs | 72 --
src/NMS.AMQP/Message/Message.cs | 287 -----
.../{BytesMessage.cs => NmsBytesMessage.cs} | 469 +++----
.../AtomicSequence.cs => Message/NmsMapMessage.cs} | 48 +-
src/NMS.AMQP/Message/NmsMessage.cs | 233 ++++
src/NMS.AMQP/Message/NmsMessageTransformation.cs | 158 +++
.../{ObjectMessage.cs => NmsObjectMessage.cs} | 56 +-
src/NMS.AMQP/Message/NmsStreamMessage.cs | 469 +++++++
.../NmsTextMessage.cs} | 44 +-
...pMessageCloak.cs => OutboundMessageDispatch.cs} | 22 +-
src/NMS.AMQP/Message/StreamMessage.cs | 418 ------
src/NMS.AMQP/Message/TextMessage.cs | 67 -
src/NMS.AMQP/MessageConsumer.cs | 977 --------------
src/NMS.AMQP/MessageLink.cs | 506 --------
src/NMS.AMQP/MessageProducer.cs | 555 --------
src/NMS.AMQP/Meta/ConnectionInfo.cs | 134 ++
src/NMS.AMQP/Meta/ConsumerInfo.cs | 52 +
src/NMS.AMQP/Meta/LinkInfo.cs | 61 +
src/NMS.AMQP/Meta/ProducerInfo.cs | 74 ++
.../ResourceInfo.cs} | 27 +-
src/NMS.AMQP/Meta/SessionInfo.cs | 66 +
src/NMS.AMQP/NMSConnectionFactory.cs | 40 -
src/NMS.AMQP/NMSResource.cs | 158 ---
src/NMS.AMQP/NmsAcknowledgeCallback.cs | 58 +
src/NMS.AMQP/NmsConnection.cs | 563 ++++++++
src/NMS.AMQP/NmsConnectionFactory.cs | 249 ++++
src/NMS.AMQP/NmsMessageConsumer.cs | 469 +++++++
src/NMS.AMQP/NmsMessageProducer.cs | 256 ++++
.../Cloak/IStreamMessageCloak.cs => NmsQueue.cs} | 36 +-
src/NMS.AMQP/NmsSession.cs | 525 ++++++++
...tomicSequence.cs => NmsTemporaryDestination.cs} | 49 +-
...derTransportContext.cs => NmsTemporaryQueue.cs} | 37 +-
...derTransportContext.cs => NmsTemporaryTopic.cs} | 43 +-
.../Cloak/IBytesMessageCloak.cs => NmsTopic.cs} | 29 +-
src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs | 199 +++
.../Provider/Amqp/AmqpConnectionSession.cs | 64 +
src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs | 310 +++++
src/NMS.AMQP/Provider/Amqp/AmqpMessageIdHelper.cs | 309 +++++
src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs | 193 +++
src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs | 214 +++
.../Amqp/AmqpProviderFactory.cs} | 47 +-
src/NMS.AMQP/Provider/Amqp/AmqpSession.cs | 149 +++
.../Provider/Amqp/AmqpTemporaryDestination.cs | 114 ++
src/NMS.AMQP/Provider/Amqp/Message/AmqpCodec.cs | 140 ++
.../Provider/Amqp/Message/AmqpMessageFactory.cs | 94 ++
.../Amqp/Message/AmqpNmsBytesMessageFacade.cs | 195 +++
.../Amqp/Message/AmqpNmsMapMessageFacade.cs | 92 ++
.../Provider/Amqp/Message/AmqpNmsMessageFacade.cs | 479 +++++++
.../Amqp/Message/AmqpNmsObjectMessageFacade.cs | 95 ++
.../Amqp/Message/AmqpNmsStreamMessageFacade.cs | 161 +++
.../Amqp/Message/AmqpNmsTextMessageFacade.cs | 106 ++
.../Amqp/Message/AmqpSerializedObjectDelegate.cs | 105 ++
.../Amqp/Message/AmqpTypedObjectDelegate.cs | 107 ++
.../Amqp/Message/IAmqpObjectTypeDelegate.cs} | 28 +-
src/NMS.AMQP/Provider/Failover/FailoverProvider.cs | 673 ++++++++++
.../Failover/FailoverProviderFactory.cs} | 44 +-
src/NMS.AMQP/Provider/Failover/FailoverRequest.cs | 136 ++
.../Failover/FailoverUriPool.cs} | 76 +-
.../IMessageQueue.cs => Provider/IProvider.cs} | 63 +-
.../IProviderFactory.cs} | 14 +-
src/NMS.AMQP/Provider/IProviderListener.cs | 74 ++
src/NMS.AMQP/Provider/ProviderFactory.cs | 49 +
src/NMS.AMQP/Queue.cs | 140 --
src/NMS.AMQP/RemoveSubscriptionLink.cs | 165 ---
src/NMS.AMQP/Session.cs | 920 -------------
src/NMS.AMQP/SessionDispatcher.cs | 65 +
src/NMS.AMQP/TemporaryLink.cs | 130 --
.../ITextMessageCloak.cs => TerminusDurability.cs} | 18 +-
src/NMS.AMQP/Topic.cs | 138 --
...nsportContext.cs => ISecureTransportContext.cs} | 8 +-
...derTransportContext.cs => ITransportContext.cs} | 24 +-
.../{Secure/AMQP => }/SecureTransportContext.cs | 42 +-
.../Transport/{AMQP => }/TransportContext.cs | 91 +-
...nsportContext.cs => TransportContextFactory.cs} | 42 +-
src/NMS.AMQP/Util/AmqpDestinationHelper.cs | 240 ++++
.../Util/{AtomicSequence.cs => AtomicBool.cs} | 43 +-
src/NMS.AMQP/Util/AtomicSequence.cs | 2 +-
src/NMS.AMQP/Util/DispatchExecutor.cs | 733 -----------
src/NMS.AMQP/Util/IdGenerator.cs | 4 +-
src/NMS.AMQP/Util/LinkCache.cs | 190 ---
src/NMS.AMQP/Util/MessageSupport.cs | 321 +----
src/NMS.AMQP/Util/PriorityMessageQueue.cs | 141 ++
src/NMS.AMQP/Util/PropertyUtil.cs | 490 +------
src/NMS.AMQP/Util/SymbolUtil.cs | 4 +-
src/NMS.AMQP/Util/TaskUtil.cs | 81 --
.../Util/Types/Map/AMQP/AMQPPrimitiveMap.cs | 5 +-
src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs | 78 +-
src/NMS.AMQP/Util/Types/Queue/FIFOMessageQueue.cs | 103 --
src/NMS.AMQP/Util/Types/Queue/MessageQueueBase.cs | 251 ----
.../Util/Types/Queue/PriorityMessageQueue.cs | 173 ---
.../IBytesMessageCloak.cs => Util/URISupport.cs} | 27 +-
src/NMS.AMQP/Util/UriUtil.cs | 71 -
src/StructuredMessage/StructuredMessage.cs | 35 +-
src/StructuredMessage/StructuredMessage.csproj | 1 +
.../Apache-NMS-AMQP-Test.csproj | 18 +-
test/Apache-NMS-AMQP-Test/AtomicBoolTest.cs | 70 +
test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs | 110 ++
.../AmqpAcknowledgmentsIntegrationTest.cs | 288 +++++
.../Integration/ConnectionIntegrationTest.cs | 248 ++++
.../Integration/ConsumerIntegrationTest.cs | 873 +++++++++++++
.../Integration/FailoverIntegrationTest.cs | 749 +++++++++++
.../Integration/ProducerIntegrationTest.cs | 638 +++++++++
.../Integration/SessionIntegrationTest.cs | 160 +++
.../Integration/SubscriptionsIntegrationTest.cs | 83 ++
.../Integration/TemporaryQueueIntegrationTest.cs | 114 ++
.../Integration/TemporaryTopicIntegrationTest.cs | 113 ++
.../Message/Facade/NmsTestBytesMessageFacade.cs | 89 ++
.../Message/Facade/NmsTestMapMessageFacade.cs | 18 +-
.../Message/Facade/NmsTestMessageFacade.cs | 83 ++
.../Message/Facade/NmsTestObjectMessageFacade.cs | 28 +-
.../Message/Facade/NmsTestStreamMessageFacade.cs | 61 +
.../Message/Facade/NmsTestTextMessageFacade.cs | 35 +-
.../Message/Facade/TestMessageFactory.cs | 75 ++
.../Message/Foreign/ForeignNmsBytesMessage.cs | 155 +++
.../Message/Foreign/ForeignNmsMapMessage.cs | 21 +-
.../Message/Foreign/ForeignNmsMessage.cs | 106 ++
.../Message/Foreign/ForeignNmsObjectMessage.cs | 26 +-
.../Message/Foreign/ForeignNmsStreamMessage.cs | 148 +++
.../Message/Foreign/ForeignNmsTextMessage.cs | 26 +-
.../Message/NmsBytesMessageTest.cs | 538 ++++++++
.../Message/NmsMapMessageTest.cs | 54 +
.../Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs | 442 +++++++
.../Message/NmsMessageTransformationTest.cs | 315 +++++
.../Message/NmsObjectMessageTest.cs | 138 ++
.../Message/NmsStreamMessageTest.cs | 1019 +++++++++++++++
.../Message/NmsTextMessageTest.cs | 143 ++
test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs | 148 +++
.../Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs | 134 ++
.../Apache-NMS-AMQP-Test/NmsQueueTest.cs | 46 +-
.../Apache-NMS-AMQP-Test/NmsTopicTest.cs | 46 +-
.../PriorityMessageQueueTest.cs | 221 ++++
test/Apache-NMS-AMQP-Test/PropertyUtilTest.cs | 78 ++
.../Provider/Amqp/AmqpCodecTest.cs | 585 +++++++++
.../Provider/Amqp/AmqpMessageFactoryTest.cs | 175 +++
.../Provider/Amqp/AmqpMessageIdHelperTest.cs | 61 +
.../Provider/Amqp/AmqpNmsBytesMessageFacadeTest.cs | 536 ++++++++
.../Provider/Amqp/AmqpNmsMapMessageFacadeTest.cs | 232 ++++
.../Provider/Amqp/AmqpNmsMessageFacadeTest.cs | 1361 ++++++++++++++++++++
.../Provider/Amqp/AmqpNmsMessageTypesTestCase.cs | 145 +++
.../Amqp/AmqpNmsObjectMessageFacadeTest.cs | 372 ++++++
.../Amqp/AmqpNmsStreamMessageFacadeTest.cs | 297 +++++
.../Provider/Amqp/AmqpNmsTextMessageFacadeTest.cs | 242 ++++
.../Provider/Amqp/AmqpProviderFactoryTest.cs | 58 +
.../Provider/Amqp/AmqpProviderTest.cs | 46 +-
.../Provider/FailoverProviderFactoryTest.cs | 88 ++
.../Provider/FailoverProviderTest.cs | 295 +++++
.../Provider/FailoverUriPoolTest.cs | 78 ++
.../Provider/Mock/MockProvider.cs | 129 ++
.../Provider/Mock/MockProviderConfiguration.cs | 16 +-
.../Provider/Mock/MockProviderFactory.cs | 48 +-
.../Provider/Mock/MockProviderStats.cs | 97 ++
.../Provider/Mock/MockRemotePeer.cs | 29 +-
.../Test/Attribute/ConnectionSetup.cs | 0
.../Test/Attribute/ConsumerSetup.cs | 0
.../Test/Attribute/DestinationSetup.cs | 0
.../Test/Attribute/ProducerSetup.cs | 0
.../Test/Attribute/SessionSetup.cs | 0
.../Test/Attribute/TestSetup.cs | 0
.../Test/TestCase/BaseTestCase.cs | 0
.../Test/TestCase/ConnectionTest.cs | 0
.../Test/TestCase/ConsumerTest.cs | 0
.../Test/TestCase/MessageTest.cs | 0
.../Test/TestCase/ProducerTest.cs | 0
.../Test/TestCase/SecureConnectionTest.cs | 6 +-
.../Test/TestCase/SessionTest.cs | 0
.../Test/Util/NMSLogger.cs | 0
.../Test/Util/TestConfig.cs | 0
.../TestAmqp/MockLinkEndpoint.cs | 51 +
.../TestAmqp/MockLinkProcessor.cs | 31 +-
.../TestAmqp/TerminusDurability.cs | 16 +-
test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 89 ++
.../TestAmqp/TestLinkEndpoint.cs | 38 +-
.../TestAmqp/TestLinkProcessor.cs | 73 ++
test/Apache-NMS-AMQP-Test/TestAmqp/TestListener.cs | 341 +++++
.../TestAmqp/TestMessageProcessor.cs | 29 +-
.../TestAmqp/TestMessageSource.cs | 88 ++
.../TestAmqp/TestRequestProcessor.cs | 20 +-
test/{ => Apache-NMS-AMQP-Test}/TestSuite.config | 0
.../Transport/TransportContextFactoryTest.cs | 86 ++
test/Apache-NMS-AMQP-Test/URISupportTest.cs | 61 +
.../config/Adapter.runsettings | 0
.../config/cert/ReadMe.md | 0
.../config/cert/broker.crt | 0
.../config/cert/broker.key | 0
test/{ => Apache-NMS-AMQP-Test}/config/cert/ca.crt | 0
test/{ => Apache-NMS-AMQP-Test}/config/cert/ca.key | 0
.../config/cert/client.crt | 0
.../config/cert/client.key | 0
.../config/cert/client_trust.jks | Bin
.../config/cert/nms_test_broker.jks | Bin
.../config/cert/nms_test_broker.p12 | Bin
test/NMS-AMQP.Test.csproj.user | 6 -
222 files changed, 21924 insertions(+), 12608 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7cf535c..412b85f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -259,3 +259,4 @@ ModelManifest.xml
# FAKE - F# Make
.fake/
+.idea/
diff --git a/apache-nms-amqp.sln b/apache-nms-amqp.sln
index 1da756d..3f33096 100644
--- a/apache-nms-amqp.sln
+++ b/apache-nms-amqp.sln
@@ -1,12 +1,12 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.26730.10
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.28922.388
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP", "src\NMS.AMQP\Apache-NMS-AMQP.csproj", "{0D8CF699-9702-4EC6-8719-C2968D32F09A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloWorld", "src\HelloWorld\HelloWorld.csproj", "{A56349BE-ED66-4E18-B5FE-E3EA069D2ADD}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP-Test", "test\Apache-NMS-AMQP-Test.csproj", "{DF402917-D85D-421C-A7E2-DC3DF371B9CB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP-Test", "test\Apache-NMS-AMQP-Test\Apache-NMS-AMQP-Test.csproj", "{DF402917-D85D-421C-A7E2-DC3DF371B9CB}"
ProjectSection(ProjectDependencies) = postProject
{0D8CF699-9702-4EC6-8719-C2968D32F09A} = {0D8CF699-9702-4EC6-8719-C2968D32F09A}
EndProjectSection
diff --git a/src/HelloWorld/HelloWorld.cs b/src/HelloWorld/HelloWorld.cs
index 2e10cec..d17a2db 100644
--- a/src/HelloWorld/HelloWorld.cs
+++ b/src/HelloWorld/HelloWorld.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Specialized;
using Apache.NMS;
+using Apache.NMS.AMQP;
using CommandLine;
namespace HelloWorld
@@ -65,20 +66,6 @@ namespace HelloWorld
Uri providerUri = new Uri(ip);
Console.WriteLine("scheme: {0}", providerUri.Scheme);
- StringDictionary properties = new StringDictionary();
- if (opts.username != null)
- {
- properties["NMS.username"] = opts.username;
- }
- if (opts.password != null)
- {
- properties["NMS.password"] = opts.password;
- }
- if (opts.clientId != null)
- {
- properties["NMS.clientId"] = opts.clientId;
- }
- properties["NMS.sendtimeout"] = opts.connTimeout+"";
IConnection conn = null;
if (opts.topic == null && opts.queue == null)
{
@@ -87,8 +74,24 @@ namespace HelloWorld
}
try
{
- Apache.NMS.AMQP.NMSConnectionFactory providerFactory = new Apache.NMS.AMQP.NMSConnectionFactory(providerUri, properties);
- IConnectionFactory factory = providerFactory.ConnectionFactory;
+ NmsConnectionFactory factory = new NmsConnectionFactory(ip);
+ if (opts.username != null)
+ {
+ factory.UserName = opts.username;
+ }
+ if (opts.password != null)
+ {
+ factory.Password = opts.password;
+ }
+ if (opts.clientId != null)
+ {
+ factory.ClientId = opts.clientId;
+ }
+
+ if (opts.connTimeout != default)
+ {
+ factory.SendTimeout = opts.connTimeout;
+ }
Console.WriteLine("Creating Connection...");
conn = factory.CreateConnection();
diff --git a/src/HelloWorld/HelloWorld.csproj b/src/HelloWorld/HelloWorld.csproj
index 22fd330..e2176fe 100644
--- a/src/HelloWorld/HelloWorld.csproj
+++ b/src/HelloWorld/HelloWorld.csproj
@@ -21,6 +21,7 @@ under the License.
<OutputType>Exe</OutputType>
<RootNamespace>HelloWorld</RootNamespace>
<AssemblyName>HelloWorld</AssemblyName>
+ <LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index b4fe988..a4074aa 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -22,6 +22,7 @@ with the License. You may obtain a copy of the License at
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<RootNamespace>Apache.NMS.AMQP</RootNamespace>
<AssemblyName>Apache.NMS.AMQP</AssemblyName>
+ <LangVersion>7.3</LangVersion>
</PropertyGroup>
<PropertyGroup>
@@ -63,5 +64,6 @@ with the License. You may obtain a copy of the License at
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
<PackageReference Include="AMQPNetLite.Core" Version="2.1.7" />
<PackageReference Include="Apache.NMS" Version="1.8.0" />
+ <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
</ItemGroup>
</Project>
diff --git a/src/NMS.AMQP/Connection.cs b/src/NMS.AMQP/Connection.cs
deleted file mode 100644
index c7af40f..0000000
--- a/src/NMS.AMQP/Connection.cs
+++ /dev/null
@@ -1,1160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Collections.Specialized;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Amqp;
-using Amqp.Framing;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Transport;
-using System.Reflection;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP
-{
- using Message.Factory;
- enum ConnectionState
- {
- UNKNOWN = -1,
- INITIAL = 0,
- CONNECTING = 1,
- CONNECTED = 2,
- CLOSING = 3,
- CLOSED = 4,
-
- }
-
- /// <summary>
- /// Apache.NMS.AMQP.Connection facilitates management and creates the underlying Amqp.Connection protocol engine object.
- /// Apache.NMS.AMQP.Connection is also the NMS.AMQP.Session Factory.
- /// </summary>
- class Connection : NMSResource<ConnectionInfo>, Apache.NMS.IConnection
- {
- public static readonly string MESSAGE_OBJECT_SERIALIZATION_PROP = PropertyUtil.CreateProperty("Message.Serialization", ConnectionFactory.ConnectionPropertyPrefix);
-
- private IRedeliveryPolicy redeliveryPolicy;
- private Amqp.IConnection impl;
- private ProviderCreateConnection implCreate;
- private ConnectionInfo connInfo;
- private readonly IdGenerator clientIdGenerator;
- private Atomic<bool> clientIdCanSet = new Atomic<bool>(true);
- private Atomic<bool> closing = new Atomic<bool>(false);
- private Atomic<ConnectionState> state = new Atomic<ConnectionState>(ConnectionState.INITIAL);
- private CountDownLatch latch;
- private ConcurrentDictionary<Id, Session> sessions = new ConcurrentDictionary<Id, Session>();
- private IdGenerator sesIdGen = null;
- private IdGenerator tempTopicIdGen = null;
- private IdGenerator tempQueueIdGen = null;
- private StringDictionary properties;
- private TemporaryLinkCache temporaryLinks = null;
- private IProviderTransportContext transportContext = null;
- private DispatchExecutor exceptionExecutor = null;
-
- #region Contructor
-
- internal Connection(Uri addr, IdGenerator clientIdGenerator)
- {
- connInfo = new ConnectionInfo();
- connInfo.remoteHost = addr;
- Info = connInfo;
- this.clientIdGenerator = clientIdGenerator;
- latch = new CountDownLatch(1);
- temporaryLinks = new TemporaryLinkCache(this);
- }
-
- #endregion
-
- #region Internal Properties
-
- internal Amqp.IConnection InnerConnection { get { return this.impl; } }
-
- internal IdGenerator SessionIdGenerator
- {
- get
- {
- IdGenerator sig = sesIdGen;
- lock (this)
- {
- if (sig == null)
- {
- sig = new NestedIdGenerator("ID:ses", connInfo.Id, true);
- sesIdGen = sig;
- }
- }
- return sig;
- }
- }
-
- internal IdGenerator TemporaryTopicGenerator
- {
- get
- {
- IdGenerator ttg = tempTopicIdGen;
- lock (this)
- {
- if (ttg == null)
- {
- ttg = new NestedIdGenerator("ID:nms-temp-topic", Info.Id, true);
- tempTopicIdGen = ttg;
- }
- }
- return ttg;
- }
- }
-
- internal IdGenerator TemporaryQueueGenerator
- {
- get
- {
- IdGenerator tqg = tempQueueIdGen;
- lock (this)
- {
- if (tqg == null)
- {
- tqg = new NestedIdGenerator("ID:nms-temp-queue", Info.Id, true);
- tempQueueIdGen = tqg;
- }
- }
- return tqg;
- }
- }
-
- internal bool IsConnected
- {
- get
- {
- return this.state.Value.Equals(ConnectionState.CONNECTED);
- }
- }
-
- internal bool IsClosed
- {
- get
- {
- return this.state.Value.Equals(ConnectionState.CLOSED);
- }
- }
-
- internal ushort MaxChannel
- {
- get { return connInfo.channelMax; }
- }
-
- internal MessageTransformation TransformFactory
- {
- get
- {
- return MessageFactory<ConnectionInfo>.Instance(this).GetTransformFactory();
- }
- }
-
- internal IMessageFactory MessageFactory
- {
- get
- {
- return MessageFactory<ConnectionInfo>.Instance(this);
- }
- }
-
- internal string TopicPrefix
- {
- get { return connInfo.TopicPrefix; }
- }
-
- internal string QueuePrefix
- {
- get { return connInfo.QueuePrefix; }
- }
-
- internal bool IsAnonymousRelay
- {
- get { return connInfo.IsAnonymousRelay; }
- }
-
- internal bool IsDelayedDelivery
- {
- get { return connInfo.IsDelayedDelivery; }
- }
-
- #endregion
-
- #region Internal Methods
-
- internal ITemporaryTopic CreateTemporaryTopic()
- {
- TemporaryTopic temporaryTopic = new TemporaryTopic(this);
-
- CreateTemporaryLink(temporaryTopic);
-
- return temporaryTopic;
- }
-
- internal ITemporaryQueue CreateTemporaryQueue()
- {
- TemporaryQueue temporaryQueue = new TemporaryQueue(this);
-
- CreateTemporaryLink(temporaryQueue);
-
- return temporaryQueue;
- }
-
- private void CreateTemporaryLink(TemporaryDestination temporaryDestination)
- {
- TemporaryLink link = new TemporaryLink(temporaryLinks.Session, temporaryDestination);
-
- link.Attach();
-
- temporaryLinks.AddLink(temporaryDestination, link);
-
- }
-
- /// <summary>
- /// Unsubscribes Durable Consumers on the connection
- /// </summary>
- /// <param name="name">The subscription name.</param>
- internal void Unsubscribe(string name)
- {
- // check for any active consumers on the subscription name.
- foreach (Session session in GetSessions())
- {
- if (session.ContainsSubscriptionName(name))
- {
- throw new IllegalStateException("Cannot unsubscribe from Durable Consumer while consuming messages.");
- }
- }
- // unsubscribe using an instance of RemoveSubscriptionLink.
- RemoveSubscriptionLink removeLink = new RemoveSubscriptionLink(this.temporaryLinks.Session, name);
- removeLink.Unsubscribe();
- }
-
- internal bool ContainsSubscriptionName(string name)
- {
- foreach (Session session in GetSessions())
- {
- if (session.ContainsSubscriptionName(name))
- {
- return true;
- }
- }
- return false;
- }
-
- internal void Configure(ConnectionFactory cf)
- {
- Amqp.ConnectionFactory cfImpl = cf.Factory as Amqp.ConnectionFactory;
-
- // get properties from connection factory
- StringDictionary properties = cf.ConnectionProperties;
-
- // apply connection properties to connection factory and connection info.
- PropertyUtil.SetProperties(cfImpl.AMQP, properties, ConnectionFactory.ConnectionPropertyPrefix);
- PropertyUtil.SetProperties(connInfo, properties, ConnectionFactory.ConnectionPropertyPrefix);
-
- // create copy of transport context
- this.transportContext = cf.Context.Copy();
-
- // Store raw properties for future objects
- this.properties = PropertyUtil.Clone(properties);
-
- // Create Connection builder delegate.
- this.implCreate = this.transportContext.CreateConnectionBuilder();
- }
-
- internal StringDictionary Properties
- {
- get { return PropertyUtil.Merge(this.properties, PropertyUtil.GetProperties(this.connInfo)); }
- }
-
- internal void Remove(TemporaryDestination destination)
- {
- temporaryLinks.RemoveLink(destination);
- }
-
- internal void DestroyTemporaryDestination(TemporaryDestination destination)
- {
- ThrowIfClosed();
- foreach(Session session in GetSessions())
- {
- if (session.IsDestinationInUse(destination))
- {
- throw new IllegalStateException("Cannot delete Temporary Destination, {0}, while consuming messages.");
- }
- }
- try
- {
- TemporaryLink link = temporaryLinks.RemoveLink(destination);
- if(link != null && !link.IsClosed)
- {
- link.Close();
- }
- }
- catch (Exception e)
- {
- throw ExceptionSupport.Wrap(e);
- }
- }
-
- internal void Remove(Session ses)
- {
- Session result = null;
- if(!sessions.TryRemove(ses.Id, out result))
- {
- Tracer.WarnFormat("Could not disassociate Session {0} with Connection {1}.", ses.Id, ClientId);
- }
- }
-
- private Session[] GetSessions()
- {
- return sessions.Values.ToArray();
- }
-
- private void CheckIfClosed()
- {
- if (this.state.Value.Equals(ConnectionState.CLOSED))
- {
- throw new IllegalStateException("Operation invalid on closed connection.");
- }
- }
-
- private void ProcessCapabilities(Open openResponse)
- {
- if(openResponse.OfferedCapabilities != null || openResponse.OfferedCapabilities.Length > 0)
- {
- foreach(Amqp.Types.Symbol symbol in openResponse.OfferedCapabilities)
- {
- if (SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY.Equals(symbol))
- {
- connInfo.IsAnonymousRelay = true;
- }
- else if (SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY.Equals(symbol))
- {
- connInfo.IsDelayedDelivery = true;
- }
- else
- {
- connInfo.AddCapability(symbol);
- }
- }
- }
- }
-
- private void ProcessRemoteConnectionProperties(Open openResponse)
- {
- if (openResponse.Properties != null && openResponse.Properties.Count > 0)
- {
- foreach(object key in openResponse.Properties.Keys)
- {
- string keyString = key.ToString();
- string valueString = openResponse.Properties[key]?.ToString();
- this.connInfo.RemotePeerProperies.Add(keyString, valueString);
- }
- }
- }
-
- private void OpenResponse(Amqp.IConnection conn, Open openResp)
- {
- Tracer.InfoFormat("Connection {0}, Open {0}", conn.ToString(), openResp.ToString());
- Tracer.DebugFormat("Open Response : \n Hostname = {0},\n ContainerId = {1},\n MaxChannel = {2},\n MaxFrame = {3}\n", openResp.HostName, openResp.ContainerId, openResp.ChannelMax, openResp.MaxFrameSize);
- Tracer.DebugFormat("Open Response Descriptor : \n Descriptor Name = {0},\n Descriptor Code = {1}\n", openResp.Descriptor.Name, openResp.Descriptor.Code);
- ProcessCapabilities(openResp);
- ProcessRemoteConnectionProperties(openResp);
- if (SymbolUtil.CheckAndCompareFields(openResp.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
- {
- Tracer.InfoFormat("Open response contains {0} property the connection {1} will soon be closed.", SymbolUtil.CONNECTION_ESTABLISH_FAILED, this.ClientId);
- }
- else
- {
- object value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
- if(value != null && value is string)
- {
- this.connInfo.TopicPrefix = value as string;
- }
- value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
- if (value != null && value is string)
- {
- this.connInfo.QueuePrefix = value as string;
- }
- this.latch?.countDown();
- }
- }
-
- private Open CreateOpenFrame(ConnectionInfo connInfo)
- {
- Open frame = new Open();
- frame.ContainerId = connInfo.clientId;
- frame.ChannelMax = connInfo.channelMax;
- frame.MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize);
- frame.HostName = connInfo.remoteHost.Host;
- frame.IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout);
- frame.DesiredCapabilities = new Amqp.Types.Symbol[] {
- SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
- SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
- SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
- };
-
- return frame;
- }
-
- internal void Connect()
- {
- if (this.state.CompareAndSet(ConnectionState.INITIAL, ConnectionState.CONNECTING))
- {
- Address addr = UriUtil.ToAddress(connInfo.remoteHost, connInfo.username, connInfo.password ?? string.Empty);
- Tracer.InfoFormat("Creating Address: {0}", addr.Host);
- if (this.clientIdCanSet.CompareAndSet(true, false))
- {
- if (this.ClientId == null)
- {
- connInfo.ResourceId = this.clientIdGenerator.GenerateId();
- }
- else
- {
- connInfo.ResourceId = new Id(ClientId);
- }
- Tracer.InfoFormat("Staring Connection with Client Id : {0}", this.ClientId);
- }
-
- Open openFrame = CreateOpenFrame(this.connInfo);
-
- this.latch = new CountDownLatch(1);
-
- Task<Amqp.Connection> fconn = this.implCreate(addr, openFrame, this.OpenResponse);
- // wait until the Open request is sent
- this.impl = TaskUtil.Wait(fconn, connInfo.connectTimeout);
- if(fconn.Exception != null)
- {
- // exceptions thrown from TaskUtil are typically System.AggregateException and are usually transport exceptions for secure transport.
- // extract the innerException of interest and wrap it as an NMS exception
- if (fconn.Exception is AggregateException)
- {
- throw ExceptionSupport.Wrap(fconn.Exception.InnerException,
- "Failed to connect host {0}. Cause: {1}", openFrame.HostName, fconn.Exception.InnerException?.Message ?? fconn.Exception.Message);
- }
- else {
- throw ExceptionSupport.Wrap(fconn.Exception,
- "Failed to connect host {0}. Cause: {1}", openFrame.HostName, fconn.Exception?.Message);
- }
- }
-
- this.impl.Closed += OnInternalClosed;
- this.impl.AddClosedCallback(OnInternalClosed);
-
- ConnectionState finishedState = ConnectionState.UNKNOWN;
- // Wait for Open response
- try
- {
- bool received = this.latch.await((this.Info.requestTimeout==0) ? Timeout.InfiniteTimeSpan : this.RequestTimeout);
- if (received && this.impl.Error == null && fconn.Exception == null)
- {
-
- Tracer.InfoFormat("Connection {0} has connected.", this.impl.ToString());
- finishedState = ConnectionState.CONNECTED;
- // register connection factory once client Id accepted.
- MessageFactory<ConnectionInfo>.Register(this);
- }
- else
- {
- if (!received)
- {
- // Timeout occured waiting on response
- Tracer.InfoFormat("Connection Response Timeout. Failed to receive response from {0} in {1}ms", addr.Host, this.Info.requestTimeout);
- }
- finishedState = ConnectionState.INITIAL;
-
- if (fconn.Exception == null)
- {
- if (!received) throw ExceptionSupport.GetTimeoutException(this.impl, "Connection {0} has failed to connect in {1}ms.", ClientId, connInfo.closeTimeout);
- Tracer.ErrorFormat("Connection {0} has Failed to connect. Message: {1}", ClientId, (this.impl.Error == null ? "Unknown" : this.impl.Error.ToString()));
-
- throw ExceptionSupport.GetException(this.impl, "Connection {0} has failed to connect.", ClientId);
- }
- else
- {
- throw ExceptionSupport.Wrap(fconn.Exception, "Connection {0} failed to connect.", ClientId);
- }
-
- }
- }
- finally
- {
- this.latch = null;
- this.state.GetAndSet(finishedState);
- if (finishedState != ConnectionState.CONNECTED)
- {
- this.impl.Close(TimeSpan.FromMilliseconds(connInfo.closeTimeout),null);
- }
- }
- }
- }
-
- private void Shutdown()
- {
- foreach(Session s in GetSessions())
- {
- s.Shutdown();
- }
- // signals to the DispatchExecutor to stop enqueue exception notifications
- // and drain off remaining notifications.
- this.exceptionExecutor?.Shutdown();
- }
-
- private void OnInternalClosed(IAmqpObject sender, Error error)
- {
- string name = null;
- Connection self = null;
- try
- {
- self = this;
- // name should throw should the finalizer of the Connection object already completed.
- name = self.ClientId;
- Tracer.InfoFormat("Received Close Request for Connection {0}.", name);
- if (self.state.CompareAndSet(ConnectionState.CONNECTED, ConnectionState.CLOSED))
- {
- // unexpected or amqp transport close.
-
- // notify any error to exception Dispatcher.
- if (error != null)
- {
- NMSException nmse = ExceptionSupport.GetException(error, "Connection {0} Closed.", name);
- self.OnException(nmse);
- }
-
- // shutdown connection facilities.
- if (self.IsStarted)
- {
- self.Shutdown();
- }
- MessageFactory<ConnectionInfo>.Unregister(self);
- }
- else if (self.state.CompareAndSet(ConnectionState.CLOSING, ConnectionState.CLOSED))
- {
- // application close.
- MessageFactory<ConnectionInfo>.Unregister(self);
- }
- self.latch?.countDown();
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Caught Exception during Amqp Connection close for NMS Connection{0}. Exception {1}",
- name != null ? (" " + name) : "", ex);
- }
- }
-
- private void Disconnect()
- {
- if(this.state.CompareAndSet(ConnectionState.CONNECTED, ConnectionState.CLOSING) && this.impl!=null)
- {
- Tracer.InfoFormat("Sending Close Request On Connection {0}.", ClientId);
- try
- {
- if (!this.impl.IsClosed)
- {
- this.impl.Close(TimeSpan.FromMilliseconds(connInfo.closeTimeout), null);
- }
- }
- catch (AmqpException amqpEx)
- {
- throw ExceptionSupport.Wrap(amqpEx, "Error Closing Amqp Connection " + ClientId);
- }
- catch (TimeoutException tmoutEx)
- {
- throw ExceptionSupport.GetTimeoutException(this.impl, "Timeout waiting for Amqp Connection {0} Close response. Message : {1}", ClientId, tmoutEx.Message);
- }
- finally
- {
- if (this.state.CompareAndSet(ConnectionState.CLOSING, ConnectionState.CLOSED))
- {
- // connection cleanup.
- MessageFactory<ConnectionInfo>.Unregister(this);
- this.impl = null;
- }
-
- }
- }
- }
-
- protected DispatchExecutor ExceptionExecutor
- {
- get
- {
- if(exceptionExecutor == null && !IsClosed)
- {
- exceptionExecutor = new DispatchExecutor(true);
- exceptionExecutor.Start();
- }
- return exceptionExecutor;
- }
- }
-
- internal void OnException(Exception ex)
- {
- Apache.NMS.ExceptionListener listener = this.ExceptionListener;
- if(listener != null)
- {
- ExceptionNotification en = new ExceptionNotification(this, ex);
- this.ExceptionExecutor.Enqueue(en);
- }
- }
-
- protected void DispatchException(Exception ex)
- {
- Apache.NMS.ExceptionListener listener = this.ExceptionListener;
- if (listener != null)
- {
- // Wrap does nothing if this is already a NMS exception, otherwise
- // wrap it appropriately.
- listener(ExceptionSupport.Wrap(ex));
- }
- else
- {
- Tracer.WarnFormat("Received Async exception. Type {0} Message {1}", ex.GetType().Name, ex.Message);
- Tracer.DebugFormat("Async Exception Stack {0}", ex);
- }
- }
-
- private class ExceptionNotification : DispatchEvent
- {
- private readonly Connection connection;
- private readonly Exception exception;
- public ExceptionNotification(Connection owner, Exception ex)
- {
- connection = owner;
- exception = ex;
- Callback = this.Nofify;
- }
-
- public override void OnFailure(Exception e)
- {
- base.OnFailure(e);
- connection.DispatchException(e);
- }
-
- private void Nofify()
- {
- connection.DispatchException(exception);
- }
-
- }
-
- #endregion
-
- #region IConnection methods
-
- AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
- /// <summary>
- /// Sets the <see cref="Apache.NMS.AcknowledgementMode"/> for the <see cref="Apache.NMS.ISession"/>
- /// objects created by the connection.
- /// </summary>
- public AcknowledgementMode AcknowledgementMode
- {
- get { return acknowledgementMode; }
- set { acknowledgementMode = value; }
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.ClientId"/>.
- /// </summary>
- public string ClientId
- {
- get { return connInfo.clientId; }
- set
- {
- if (this.clientIdCanSet.Value)
- {
- if (value != null && value.Length > 0)
- {
- connInfo.clientId = value;
- try
- {
- this.Connect();
- }
- catch (NMSException nms)
- {
- NMSException ex = nms;
- if (nms.Message.Contains("invalid-field:container-id"))
- {
- ex = new InvalidClientIDException(nms.Message);
- }
- throw ex;
- }
- }
- }
- else
- {
- throw new InvalidClientIDException("Client Id can not be set after connection is Started.");
- }
- }
- }
-
- /// <summary>
- /// Throws <see cref="NotImplementedException"/>.
- /// </summary>
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get => throw new NotImplementedException();
- set => throw new NotImplementedException();
- }
-
- /// <summary>
- /// Throws <see cref="NotImplementedException"/>.
- /// </summary>
- public ProducerTransformerDelegate ProducerTransformer
- {
- get => throw new NotImplementedException();
- set => throw new NotImplementedException();
- }
-
- /// <summary>
- /// AMQP Provider access to remote connection properties.
- /// This is used by <see cref="ConnectionProviderUtilities.GetRemotePeerConnectionProperties(Apache.NMS.IConnection)"/>.
- /// </summary>
- public StringDictionary RemotePeerProperties
- {
- get { return PropertyUtil.Clone(this.Info.RemotePeerProperies); }
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.MetaData"/>.
- /// </summary>
- public IConnectionMetaData MetaData
- {
- get
- {
- return ConnectionMetaData.Version;
- }
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.RedeliveryPolicy"/>.
- /// </summary>
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get { return this.redeliveryPolicy; }
- set
- {
- if (value != null)
- {
- this.redeliveryPolicy = value;
- }
- }
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.RequestTimeout"/>.
- /// </summary>
- public TimeSpan RequestTimeout
- {
- get
- {
- return TimeSpan.FromMilliseconds(this.connInfo.requestTimeout);
- }
-
- set
- {
- connInfo.requestTimeout = Convert.ToInt64(value.TotalMilliseconds);
- }
- }
-
- /// <summary>
- /// Not Implemented, throws <see cref="NotImplementedException"/>.
- /// </summary>
- public event ConnectionInterruptedListener ConnectionInterruptedListener
- {
- add => throw new NotImplementedException("AMQP Provider does not implement ConnectionInterruptedListener events.");
- remove => throw new NotImplementedException("AMQP Provider does not implement ConnectionInterruptedListener events.");
- }
-
- /// <summary>
- /// Not Implemented, throws <see cref="NotImplementedException"/>.
- /// </summary>
- public event ConnectionResumedListener ConnectionResumedListener
- {
- add => throw new NotImplementedException("AMQP Provider does not implement ConnectionResumedListener events.");
- remove => throw new NotImplementedException("AMQP Provider does not implement ConnectionResumedListener events.");
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.ExceptionListener"/>.
- /// </summary>
- public event ExceptionListener ExceptionListener;
-
- /// <summary>
- /// Creates a <see cref="Apache.NMS.ISession"/> with
- /// the connection <see cref="Apache.NMS.IConnection.AcknowledgementMode"/>.
- /// </summary>
- /// <returns>An <see cref="Apache.NMS.ISession"/> provider instance.</returns>
- public Apache.NMS.ISession CreateSession()
- {
- return CreateSession(acknowledgementMode);
- }
-
- /// <summary>
- /// Creates a <see cref="Apache.NMS.ISession"/> with the given <see cref="Apache.NMS.AcknowledgementMode"/> parameter.
- /// <para>
- /// Throws <see cref="NotImplementedException"/> for the <see cref="Apache.NMS.AcknowledgementMode.Transactional"/>.
- /// </para>
- /// </summary>
- /// <param name="acknowledgementMode"></param>
- /// <returns></returns>
- public Apache.NMS.ISession CreateSession(AcknowledgementMode acknowledgementMode)
- {
- this.CheckIfClosed();
- this.Connect();
- Session ses = new Session(this);
- ses.AcknowledgementMode = acknowledgementMode;
- try
- {
- ses.Begin();
- }
- catch(NMSException) { throw; }
- catch(Exception ex)
- {
- throw ExceptionSupport.Wrap(ex, "Failed to establish amqp Session.");
- }
-
- if(!this.sessions.TryAdd(ses.Id, ses))
- {
- Tracer.ErrorFormat("Failed to add Session {0}.", ses.Id);
- }
- Tracer.InfoFormat("Created Session {0} on connection {1}.", ses.Id, this.ClientId);
- return ses;
- }
-
- /// <summary>
- /// Destroys all temporary destinations for the connection.
- /// <para>
- /// Throws <see cref="IllegalStateException"/> should any Temporary Topic or Temporary Queue in
- /// the connection have an active consumer using it.
- /// </para>
- /// </summary>
- public void PurgeTempDestinations()
- {
- foreach(TemporaryDestination temp in temporaryLinks.Keys.ToArray())
- {
- this.DestroyTemporaryDestination(temp);
- }
- }
-
- /// <summary>
- /// See <see cref="Apache.NMS.IConnection.Close"/>.
- /// </summary>
- public void Close()
- {
- Dispose(true);
- if (this.IsClosed)
- {
- GC.SuppressFinalize(this);
- }
- }
-
- #endregion
-
- #region IDisposable Methods
-
-
- protected virtual void Dispose(bool disposing)
- {
- if (IsClosed) return;
- Tracer.DebugFormat("Closing of Connection {0}", ClientId);
- if (disposing)
- {
- // full shutdown
- if (this.closing.CompareAndSet(false, true))
- {
- Session[] connectionSessions = GetSessions();
- foreach (Session s in connectionSessions)
- {
- try
- {
- s.CheckOnDispatchThread();
- }
- catch
- {
- this.closing.Value = false;
- throw;
- }
- }
-
- this.Stop();
-
- this.temporaryLinks.Close();
-
- try
- {
- this.Disconnect();
- }
- catch (Exception ex)
- {
- // log network errors
- NMSException nmse = ExceptionSupport.Wrap(ex, "Amqp Connection close failure for NMS Connection {0}", this.Id);
- Tracer.DebugFormat("Caught Exception while closing Amqp Connection {0}. Exception {1}", this.Id, nmse);
- }
- finally
- {
- sessions?.Clear();
- sessions = null;
- if (this.state.Value.Equals(ConnectionState.CLOSED))
- {
- if (this.exceptionExecutor != null)
- {
- this.exceptionExecutor.Close();
- this.exceptionExecutor = null;
- }
- }
- }
- }
- }
- }
-
- public void Dispose()
- {
- try
- {
- this.Close();
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Caught Exception while Disposing of NMS Connection {0}. Exception {1}", this.ClientId, ex);
- }
- }
-
- #endregion
-
- #region NMSResource Methods
-
- public override bool IsStarted { get { return !mode.Value.Equals(Resource.Mode.Stopped); } }
-
- protected override void ThrowIfClosed()
- {
- this.CheckIfClosed();
- }
-
- protected override void StartResource()
- {
- this.Connect();
-
- if (!IsConnected)
- {
- throw new NMSConnectionException("Connection Failed to connect to Client.");
- }
-
- //start sessions here
- foreach (Session s in GetSessions())
- {
- s.Start();
- }
-
- }
-
- protected override void StopResource()
- {
- if ( this.impl != null && !this.impl.IsClosed)
- {
- // stop all sessions here.
- foreach (Session s in GetSessions())
- {
- s.Stop();
- }
- }
- }
-
- #endregion
-
- public override string ToString()
- {
- return "Connection:\nConnection Info:"+connInfo.ToString();
- }
-
-
- }
-
- #region Connection Provider Utilities
-
- /// <summary>
- /// This give access to provider specific functions and capabilities for a provider connection.
- /// </summary>
- public static class ConnectionProviderUtilities
- {
- public static bool IsAMQPConnection(Apache.NMS.IConnection connection)
- {
- return connection != null && connection is Apache.NMS.AMQP.Connection;
- }
-
- public static StringDictionary GetRemotePeerConnectionProperties(Apache.NMS.IConnection connection)
- {
- if (connection == null)
- {
- return null;
- }
- else if (connection is Apache.NMS.AMQP.Connection)
- {
- return (connection as Apache.NMS.AMQP.Connection).RemotePeerProperties;
- }
- return null;
- }
- }
-
- #endregion
-
- #region Connection Information inner Class
-
- internal class ConnectionInfo : ResourceInfo
- {
- static ConnectionInfo()
- {
- Amqp.ConnectionFactory defaultCF = new Amqp.ConnectionFactory();
- AmqpSettings defaultAMQPSettings = defaultCF.AMQP;
-
- DEFAULT_CHANNEL_MAX = defaultAMQPSettings.MaxSessionsPerConnection;
- DEFAULT_MAX_FRAME_SIZE = defaultAMQPSettings.MaxFrameSize;
- DEFAULT_IDLE_TIMEOUT = defaultAMQPSettings.IdleTimeout;
-
- DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
-
- }
- public const long INFINITE = -1;
- public const long DEFAULT_CONNECT_TIMEOUT = 15000;
- public const int DEFAULT_CLOSE_TIMEOUT = 15000;
- public static readonly long DEFAULT_REQUEST_TIMEOUT;
- public static readonly long DEFAULT_IDLE_TIMEOUT;
-
- public static readonly ushort DEFAULT_CHANNEL_MAX;
- public static readonly int DEFAULT_MAX_FRAME_SIZE;
-
- public ConnectionInfo() : this(null) { }
- public ConnectionInfo(Id clientId) : base(clientId)
- {
- if (clientId != null)
- this.clientId = clientId.ToString();
- }
-
- private Id ClientId = null;
-
- public override Id Id
- {
- get
- {
- if (base.Id == null)
- {
- if (ClientId == null && clientId != null)
- {
- ClientId = new Id(clientId);
- }
- return ClientId;
- }
- else
- {
- return base.Id;
- }
- }
- }
-
- internal Id ResourceId
- {
- set
- {
- if(ClientId == null && value != null)
- {
- ClientId = value;
- clientId = ClientId.ToString();
- }
-
- }
- }
-
- internal Uri remoteHost { get; set; }
- public string clientId { get; internal set; } = null;
- public string username { get; set; } = null;
- public string password { get; set; } = null;
-
- public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long connectTimeout { get; set; } = DEFAULT_CONNECT_TIMEOUT;
- public int closeTimeout { get; set; } = DEFAULT_CLOSE_TIMEOUT;
- public long idleTimout { get; set; } = DEFAULT_IDLE_TIMEOUT;
-
- public ushort channelMax { get; set; } = DEFAULT_CHANNEL_MAX;
- public int maxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
-
- public string TopicPrefix { get; internal set; } = null;
-
- public string QueuePrefix { get; internal set; } = null;
-
- public bool IsAnonymousRelay { get; internal set; } = false;
-
- public bool IsDelayedDelivery { get; internal set; } = false;
-
- public Message.Cloak.AMQPObjectEncodingType? EncodingType { get; internal set; } = null;
-
-
- public IList<string> Capabilities { get { return new List<string>(capabilities); } }
-
- public bool HasCapability(string capability)
- {
- return capabilities.Contains(capability);
- }
-
- public void AddCapability(string capability)
- {
- if (capability != null && capability.Length > 0)
- capabilities.Add(capability);
- }
-
- public StringDictionary RemotePeerProperies { get => remoteConnectionProperties; }
-
- private StringDictionary remoteConnectionProperties = new StringDictionary();
- private List<string> capabilities = new List<string>();
-
- public override string ToString()
- {
- string result = "";
- result += "connInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
-
- if (prop.GetGetMethod(true).IsPublic)
- {
- if (prop.GetGetMethod(true).ReturnParameter.ParameterType.IsEquivalentTo(typeof(List<string>)))
- {
- result += string.Format("{0} = {1},\n", prop.Name, PropertyUtil.ToString(prop.GetValue(this,null) as IList));
- }
- else
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
-
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- }
-
- #endregion
-
-}
diff --git a/src/NMS.AMQP/ConnectionFactory.cs b/src/NMS.AMQP/ConnectionFactory.cs
index ba9e9c8..4bc722d 100644
--- a/src/NMS.AMQP/ConnectionFactory.cs
+++ b/src/NMS.AMQP/ConnectionFactory.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,452 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
-using System.Collections;
-using System.Collections.Specialized;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.Policies;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Transport;
-using Apache.NMS.AMQP.Transport.AMQP;
-using Apache.NMS.AMQP.Transport.Secure;
-using Apache.NMS.AMQP.Transport.Secure.AMQP;
-using System.Security.Cryptography.X509Certificates;
-using System.Net.Security;
-using System.Security.Authentication;
-using Amqp;
namespace Apache.NMS.AMQP
{
- internal delegate Task<Amqp.Connection> ProviderCreateConnection(Amqp.Address addr, Amqp.Framing.Open open, Amqp.OnOpened onOpened);
- /// <summary>
- /// Apache.NMS.AMQP.ConnectionFactory implements Apache.NMS.IConnectionFactory.
- /// Apache.NMS.AMQP.ConnectionFactory creates, manages and configures the Amqp.ConnectionFactory used to create Amqp Connections.
- /// </summary>
- public class ConnectionFactory : Apache.NMS.IConnectionFactory
+ public class ConnectionFactory : NmsConnectionFactory
{
-
- public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
- internal static readonly string CLIENT_ID_PROP = PropertyUtil.CreateProperty("ClientId", "", ConnectionPropertyPrefix);
- internal static readonly string USERNAME_PROP = PropertyUtil.CreateProperty("UserName", "", ConnectionPropertyPrefix);
- internal static readonly string PASSWORD_PROP = PropertyUtil.CreateProperty("Password", "", ConnectionPropertyPrefix);
-
- internal const string ConnectionPropertyPrefix = "connection.";
- internal const string ConnectionPropertyAlternativePrefix = PropertyUtil.PROPERTY_PREFIX;
- internal const string TransportPropertyPrefix = "transport.";
-
- private Amqp.Address amqpHost = null;
- private Uri brokerUri;
- private string clientId;
- private IdGenerator clientIdGenerator = new IdGenerator();
-
- private StringDictionary properties = new StringDictionary();
- private StringDictionary applicationProperties = null;
-
- private TransportPropertyInterceptor transportProperties;
- private ConnectionFactoryPropertyInterceptor connectionProperties;
- private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-
- private Amqp.ConnectionFactory impl;
- private TransportContext transportContext;
-
- #region Constructor Methods
-
public ConnectionFactory()
- : this(DEFAULT_BROKER_URL)
- {
- }
-
- public ConnectionFactory(string brokerUri)
- : this(URISupport.CreateCompatibleUri(brokerUri), null, null)
- {
-
- }
-
- public ConnectionFactory(string brokerUri, string clientId)
- : this(URISupport.CreateCompatibleUri(brokerUri), clientId, null)
- {
- }
-
- public ConnectionFactory(Uri brokerUri)
- : this(brokerUri, null, null)
- { }
-
- public ConnectionFactory(Uri brokerUri, StringDictionary props)
- : this(brokerUri, null, props)
- { }
-
- public ConnectionFactory(Uri brokerUri, string clientId, StringDictionary props)
- {
- impl = new Amqp.ConnectionFactory();
- this.clientId = clientId;
- if (props != null)
- {
- this.InitApplicationProperties(props);
- }
- BrokerUri = brokerUri;
- impl.AMQP.HostName = BrokerUri.Host;
- //
- // Set up tracing in AMQP. We capture all AMQP traces in the TraceListener below
- // and map to NMS 'Tracer' logs as follows:
- // AMQP Tracer
- // Verbose Debug
- // Frame Debug
- // Information Info
- // Output Info (should not happen)
- // Warning Warn
- // Error Error
- //
- Amqp.Trace.TraceLevel = Amqp.TraceLevel.Verbose | Amqp.TraceLevel.Frame;
- Amqp.Trace.TraceListener = (level, format, args) =>
- {
- switch (level)
- {
- case Amqp.TraceLevel.Verbose:
- case Amqp.TraceLevel.Frame:
- Tracer.DebugFormat(format, args);
- break;
- case Amqp.TraceLevel.Information:
- case Amqp.TraceLevel.Output:
- //
- // Applications should not access AmqpLite directly so there
- // should be no 'Output' level logs.
- Tracer.InfoFormat(format, args);
- break;
- case Amqp.TraceLevel.Warning:
- Tracer.WarnFormat(format, args);
- break;
- case Amqp.TraceLevel.Error:
- Tracer.ErrorFormat(format, args);
- break;
- default:
- Tracer.InfoFormat("Unknown AMQP LogLevel: {}", level);
- Tracer.InfoFormat(format, args);
- break;
- }
- };
-
- }
-
- #endregion
-
- #region Connection Factory Properties
-
- internal bool IsClientIdSet
- {
- get => this.clientId == null;
- }
-
- public string ClientId
- {
- get { return this.clientId; }
- internal set
- {
- this.clientId = value;
- }
- }
-
-
- private IdGenerator ClientIDGenerator
- {
- get
- {
- IdGenerator cig = clientIdGenerator;
- lock (this)
- {
- if (cig == null)
- {
- clientIdGenerator = new IdGenerator();
- cig = clientIdGenerator;
- }
- }
- return cig;
- }
- }
-
- internal Amqp.IConnectionFactory Factory { get => this.impl; }
-
- internal IProviderTransportContext Context { get => this.transportContext; }
-
- #endregion
-
- #region IConnection Members
-
- public Uri BrokerUri
{
- get { return brokerUri; }
- set
- {
- brokerUri = value;
- if (value != null)
- {
- amqpHost = UriUtil.ToAddress(value);
- }
- else
- {
- amqpHost = null;
- }
- InitTransportProperties();
- UpdateConnectionProperties();
- }
}
- public ConsumerTransformerDelegate ConsumerTransformer
+ public ConnectionFactory(Uri brokerUri) : base(brokerUri)
{
- get => throw new NotImplementedException();
- set => throw new NotImplementedException();
}
- public ProducerTransformerDelegate ProducerTransformer
+ public ConnectionFactory(string brokerUri) : base(brokerUri)
{
- get => throw new NotImplementedException();
- set => throw new NotImplementedException();
- }
-
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get
- {
- if (redeliveryPolicy == null)
- {
- this.redeliveryPolicy = new RedeliveryPolicy();
- }
- return this.redeliveryPolicy;
- }
- set
- {
- if (value != null)
- {
- this.redeliveryPolicy = value;
- }
- }
- }
-
- public Apache.NMS.IConnection CreateConnection()
- {
- try
- {
- Connection conn = new Connection(brokerUri, ClientIDGenerator);
-
- Tracer.Info("Configuring Connection Properties");
-
- bool shouldSetClientID = this.clientId != null;
-
- conn.Configure(this);
-
- if (shouldSetClientID)
- {
- conn.ClientId = this.clientId;
-
- conn.Connect();
- }
-
- return conn;
-
- }
- catch (Exception ex)
- {
- if (ex is NMSException)
- {
- throw ex;
- }
- else
- {
- throw new NMSException(ex.Message, ex);
- }
- }
- }
-
- public Apache.NMS.IConnection CreateConnection(string userName, string password)
- {
-
- if(ConnectionProperties.ContainsKey(USERNAME_PROP))
- {
- ConnectionProperties[USERNAME_PROP] = userName;
- }
- else
- {
- ConnectionProperties.Add(USERNAME_PROP, userName);
- }
-
- if (ConnectionProperties.ContainsKey(PASSWORD_PROP))
- {
- ConnectionProperties[PASSWORD_PROP] = password;
- }
- else
- {
- ConnectionProperties.Add(PASSWORD_PROP, password);
- }
-
- return CreateConnection();
- }
-
-
-
- #endregion
- #region AMQP Connection Properties
- public Amqp.TraceLevel AMQPlogLevel
- {
- get { return this.AMQPlogLevel; }
- set
- {
- if (null != this.transportContext)
- {
- this.AMQPlogLevel = value;
- Amqp.Trace.TraceLevel = value;
- }
-
- }
- }
- #endregion
-
- #region SSLConnection Methods
-
- public RemoteCertificateValidationCallback CertificateValidationCallback
- {
- get
- {
- return (IsSSL) ? (transportContext as IProviderSecureTransportContext).ServerCertificateValidateCallback : null;
- }
- set
- {
- if (IsSSL)
- {
- (transportContext as IProviderSecureTransportContext).ServerCertificateValidateCallback = value;
- }
- }
- }
-
- public LocalCertificateSelectionCallback LocalCertificateSelect
- {
- get
- {
- return (IsSSL) ? (transportContext as IProviderSecureTransportContext).ClientCertificateSelectCallback : null;
- }
- set
- {
- if (IsSSL)
- {
- (transportContext as IProviderSecureTransportContext).ClientCertificateSelectCallback = value;
- }
- }
- }
-
- public bool IsSSL
- {
- get
- {
- return amqpHost?.UseSsl ?? false;
- }
}
- private void InitTransportProperties()
+ public ConnectionFactory(string userName, string password, string brokerUri) : base(userName, password, brokerUri)
{
- if (IsSSL)
- {
- SecureTransportContext stc = new SecureTransportContext(this);
- this.transportContext = stc;
- }
- else
- {
- this.transportContext = new TransportContext(this);
- }
-
- StringDictionary queryProps = URISupport.ParseParameters(this.brokerUri);
- StringDictionary transportProperties = URISupport.GetProperties(queryProps, TransportPropertyPrefix);
- if (this.applicationProperties != null)
- {
- StringDictionary appTProps = URISupport.GetProperties(this.applicationProperties, TransportPropertyPrefix);
- transportProperties = PropertyUtil.Merge(transportProperties, appTProps, string.Empty, string.Empty, TransportPropertyPrefix);
- }
- PropertyUtil.SetProperties(this.transportContext, transportProperties, TransportPropertyPrefix);
- if (IsSSL)
- {
- this.transportProperties = new SecureTransportPropertyInterceptor(this.transportContext as IProviderSecureTransportContext, transportProperties);
- }
- else
- {
- this.transportProperties = new TransportPropertyInterceptor(this.transportContext, transportProperties);
- }
}
-
- private void InitApplicationProperties(StringDictionary props)
- {
- // copy properties to temporary dictionary
- StringDictionary result = PropertyUtil.Clone(props);
- // extract connections properties
- StringDictionary connProps = ExtractConnectionProperties(result);
- // initialize applications properties as the union of temp and conn properties
- this.applicationProperties = PropertyUtil.Merge(result, connProps, "", "", "");
-
- }
-
- private StringDictionary ExtractConnectionProperties(StringDictionary rawProps)
- {
- // find and extract properties with ConnectionPropertyPrefix
- StringDictionary connectionProperties = URISupport.ExtractProperties(rawProps, ConnectionPropertyPrefix);
- // find and extract properties with ConnectionPropertyAlternativePrefix
- StringDictionary connectionAlternativeProperties = URISupport.ExtractProperties(rawProps, ConnectionPropertyAlternativePrefix);
- // return Union of Conn and AltConn properties prefering Conn over AltConn.
- return PropertyUtil.Merge(connectionProperties, connectionAlternativeProperties, ConnectionPropertyPrefix, ConnectionPropertyAlternativePrefix, ConnectionPropertyPrefix);
- }
-
- private StringDictionary CreateConnectionProperties(StringDictionary rawProps)
- {
- // read properties with ConnectionPropertyPrefix
- StringDictionary connectionProperties = URISupport.GetProperties(rawProps, ConnectionPropertyPrefix);
- // read properties with ConnectionPropertyAlternativePrefix
- StringDictionary connectionAlternativeProperties = URISupport.GetProperties(rawProps, ConnectionPropertyAlternativePrefix);
- // return Union of Conn and AltConn properties prefering Conn over AltConn.
- return PropertyUtil.Merge(connectionProperties, connectionAlternativeProperties, ConnectionPropertyPrefix, ConnectionPropertyAlternativePrefix, ConnectionPropertyPrefix);
- }
-
- private void UpdateConnectionProperties()
- {
- StringDictionary queryProps = URISupport.ParseParameters(this.brokerUri);
- StringDictionary brokerConnectionProperties = CreateConnectionProperties(queryProps);
- if (this.applicationProperties != null)
- {
- // combine connection properties with application properties prefering URI properties over application
- this.properties = PropertyUtil.Merge(brokerConnectionProperties, applicationProperties, "", "", "");
- }
- else
- {
- this.properties = brokerConnectionProperties;
- }
- // update connection factory members.
- connectionProperties = new ConnectionFactoryPropertyInterceptor(this, this.properties);
- }
- #endregion
-
- #region Connection Factory Property Methods
-
- public StringDictionary TransportProperties
- {
- get { return this.transportProperties; }
- }
-
- #endregion
-
- #region Connection Properties Methods
-
- public StringDictionary ConnectionProperties
- {
- get { return this.connectionProperties; }
- }
-
- public bool HasConnectionProperty(string key)
- {
- return this.properties.ContainsKey(key);
- }
-
- #endregion
}
-
-
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/ConnectionMetaData.cs b/src/NMS.AMQP/ConnectionMetaData.cs
index ecbf5f9..70fdd4f 100644
--- a/src/NMS.AMQP/ConnectionMetaData.cs
+++ b/src/NMS.AMQP/ConnectionMetaData.cs
@@ -70,7 +70,7 @@ namespace Apache.NMS.AMQP
private readonly int NMSMinor;
private ConnectionMetaData()
{
- Assembly assembly = Assembly.GetAssembly(typeof(ConnectionFactory));
+ Assembly assembly = Assembly.GetAssembly(typeof(NmsConnectionFactory));
AssemblyVersion = assembly.GetName().Version.ToString();
ProviderName = assembly.GetName().Name;
diff --git a/src/NMS.AMQP/Destination.cs b/src/NMS.AMQP/Destination.cs
deleted file mode 100644
index a998b4d..0000000
--- a/src/NMS.AMQP/Destination.cs
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP
-{
- #region Destination Implementation
- /// <summary>
- /// Apache.NMS.AMQP.Destination implements Apache.NMS.IDestination
- /// Destionation is an abstract container for a Queue or Topic.
- /// </summary>
- abstract class Destination : IDestination
- {
-
- protected string destinationName;
- protected Connection connection;
- private readonly bool queue;
-
- #region Constructor
-
- internal Destination(Connection conn, string name, bool isQ)
- {
- queue = isQ;
- ValidateName(name);
- destinationName = name;
- connection = conn;
-
- }
-
- internal Destination(Destination other)
- {
- this.queue = other.queue;
- destinationName = other.destinationName;
- connection = other.connection;
- }
-
- #endregion
-
- #region Abstract Methods
-
- protected abstract void ValidateName(string name);
-
- #endregion
-
- #region IDestination Properties
-
- public virtual DestinationType DestinationType
- {
- get
- {
- throw new NotImplementedException();
- }
- }
-
- public virtual bool IsQueue
- {
- get
- {
- return queue;
- }
- }
-
- public virtual bool IsTemporary
- {
- get
- {
- return false;
- }
- }
-
- public virtual bool IsTopic
- {
- get
- {
- return !queue;
- }
- }
-
- #endregion
-
- #region IDisposable Methods
-
- public void Dispose()
- {
- try
- {
- this.Dispose(true);
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Caught Exception while disposing of {0} : {1}. Exception : {2}",
- this.DestinationType, this.destinationName, ex);
- }
- }
-
- protected virtual void Dispose(bool disposing)
- {
-
- }
-
- #endregion
-
- #region Object Comparison Methods
-
- public override string ToString()
- {
- return base.ToString() + ":" + destinationName;
- }
-
- public virtual bool Equals (Destination other)
- {
- return this.DestinationType == other.DestinationType && this.destinationName.Equals(other.destinationName);
- }
-
- public virtual bool Equals (IDestination destination)
- {
- if (this.DestinationType == destination.DestinationType)
- {
- if (destination is Destination)
- {
- return this.Equals(destination as Destination);
- }
- else
- {
- string destName = destination.IsTopic ? (destination as ITopic).TopicName : (destination as IQueue).QueueName;
- return (destName != null && destName.Length > 0) ? destName.CompareTo(this.destinationName) == 0 : false;
- }
- }
- return false;
- }
-
- public override bool Equals(object obj)
- {
- if (obj != null && obj is IDestination)
- {
- return this.Equals(obj as IDestination);
- }
- return false;
- }
-
- public override int GetHashCode()
- {
- return destinationName.GetHashCode() * 31 + DestinationType.GetHashCode();
- }
-
- #endregion
- }
-
- #endregion
-
- #region Temporary Destination Implementation
-
- /// <summary>
- /// Apache.NMS.AMQP.TemporaryDestination inherits NMS.AMQP.Destination
- /// Destionation is an abstract container for a Temporary Queue or Temporary Topic.
- /// </summary>
- abstract class TemporaryDestination : Destination
- {
-
- private readonly Id destinationId;
-
- private bool deleted = false;
-
- #region Constructor
-
- public TemporaryDestination(Connection conn, Id name, bool isQ) : base(conn, name.ToString(), isQ)
- {
- destinationId = name;
- }
-
- public TemporaryDestination(Connection conn, string name, bool isQ) : base(conn, name, isQ)
- {
- destinationId = new Id(name);
- }
-
- #endregion
-
- #region Temporary Destination Properties
-
- internal Connection Connection
- {
- get { return connection; }
- }
-
- internal Id DestinationId
- {
- get
- {
- return destinationId;
- }
- }
-
- internal bool IsDeleted { get => deleted; }
-
- internal string DestinationName
- {
- get => base.destinationName;
- set => base.destinationName = value;
- }
-
- #endregion
-
- #region Temporary Destination Methods
-
- public virtual void Delete()
- {
- if (connection != null)
- {
- this.connection.DestroyTemporaryDestination(this);
- connection = null;
- }
- deleted = true;
- }
-
- #endregion
-
- #region IDestination Methods
-
- public override bool IsTemporary
- {
- get
- {
- return true;
- }
- }
-
- #endregion
-
- #region IDisposable Methods
-
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- this.Delete();
- }
- }
-
- #endregion
-
- #region Object Comparison Methods
-
- public override int GetHashCode()
- {
- return destinationId.GetHashCode();
- }
-
- public override bool Equals(Destination other)
- {
- if(other is TemporaryDestination)
- {
- return (other as TemporaryDestination).destinationId.Equals(this.destinationId)
- || base.Equals(other);
- }
- return base.Equals(other);
- }
-
- #endregion
-
- }
-
- #endregion
-
- #region Destination Transformation
-
- internal class DestinationTransformation
- {
- public static Destination Transform(Connection connection, IDestination destination)
- {
- Destination transformDestination = null;
- if (destination == null)
- return null;
-
- if (destination is Destination)
- {
- return destination as Destination;
- }
- string destinationName = null;
-
- DestinationType type = destination.DestinationType;
- switch (type)
- {
- case DestinationType.Queue:
- case DestinationType.TemporaryQueue:
- destinationName = (destination as IQueue).QueueName;
- break;
- case DestinationType.Topic:
- case DestinationType.TemporaryTopic:
- destinationName = (destination as ITopic).TopicName;
- break;
- default:
- throw new NMSException(string.Format("Unresolved destination. Unrecognized destination Type {0} for IDesintation {1}", type, destination?.ToString()));
- }
-
- if(destinationName == null)
- {
- throw new NMSException(string.Format("Unresolved destination. Could not resolved destination name for destination {0} type {1}.", destination?.ToString(), type));
- }
-
- switch (type)
- {
- case DestinationType.Queue:
- transformDestination = new Queue(connection, destinationName);
- break;
- case DestinationType.TemporaryQueue:
- transformDestination = new TemporaryQueue(connection, destinationName);
- break;
- case DestinationType.Topic:
- transformDestination = new Topic(connection, destinationName);
- break;
- case DestinationType.TemporaryTopic:
- transformDestination = new TemporaryTopic(connection, destinationName);
- break;
- }
-
- return transformDestination;
- }
- }
-
- #endregion
-}
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/INmsConnectionListener.cs
similarity index 65%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/INmsConnectionListener.cs
index 118b5ab..a29d90b 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/INmsConnectionListener.cs
@@ -14,23 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP
{
- internal interface IBytesMessageCloak : IMessageCloak
+ public interface INmsConnectionListener
{
- BinaryReader getDataReader();
- BinaryWriter getDataWriter();
-
- new IBytesMessageCloak Copy();
-
- int BodyLength { get; }
- void Reset();
+ void OnConsumerClosed(IMessageConsumer consumer, Exception exception);
+ void OnConnectionEstablished(Uri remoteUri);
+ void OnConnectionRestored(Uri remoteUri);
+ void OnConnectionFailure(NMSException exception);
+ void OnConnectionInterrupted(Uri remoteUri);
+ void OnProducerClosed(NmsMessageProducer messageProducer, Exception error);
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs
deleted file mode 100644
index 09d9ea9..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-
- using Cloak;
- using Util;
- using Factory;
- class AMQPBytesMessageCloak : AMQPMessageCloak, IBytesMessageCloak
- {
-
- private static readonly Data EMPTY_DATA;
-
- static AMQPBytesMessageCloak()
- {
- EMPTY_DATA = new Data();
- EMPTY_DATA.Binary = new byte[0];
- }
-
- private EndianBinaryReader byteIn=null;
- private EndianBinaryWriter byteOut=null;
-
- internal AMQPBytesMessageCloak(Connection c) : base(c)
- {
- Content = null;
- }
-
- internal AMQPBytesMessageCloak(MessageConsumer c, Amqp.Message msg) : base (c, msg) { }
-
- internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_BYTE; } }
-
- public override byte[] Content
- {
- get
- {
- return this.GetBinaryFromBody().Binary;
- }
- set
- {
- Data result = EMPTY_DATA;
- if (value != null && value.Length>0)
- {
- result = new Data();
- result.Binary = value;
- }
- this.message.BodySection = result;
- base.Content = result.Binary;
- }
- }
-
- public int BodyLength
- {
- get
- {
-
- return Content != null ? Content.Length : -1;
- }
- }
-
- public BinaryReader getDataReader()
- {
- if(byteOut != null)
- {
- throw new IllegalStateException("Cannot read message while writing.");
- }
- if (byteIn == null)
- {
- byte[] data = Content;
- if (Content == null)
- {
- data = EMPTY_DATA.Binary;
- }
- Stream dataStream = new MemoryStream(data, false);
-
- byteIn = new EndianBinaryReader(dataStream);
- }
- return byteIn;
- }
- public BinaryWriter getDataWriter()
- {
- if (byteIn != null)
- {
- throw new IllegalStateException("Cannot write message while reading.");
- }
- if (byteOut == null)
- {
- MemoryStream outputBuffer = new MemoryStream();
- this.byteOut = new EndianBinaryWriter(outputBuffer);
- message.BodySection = EMPTY_DATA;
-
- }
- return byteOut;
- }
-
- public void Reset()
- {
- if (byteOut != null)
- {
-
- MemoryStream byteStream = new MemoryStream((int)byteOut.BaseStream.Length);
- byteOut.BaseStream.Position = 0;
- byteOut.BaseStream.CopyTo(byteStream);
-
- byte[] value = byteStream.ToArray();
- Content = value;
-
- byteStream.Close();
- byteOut.Close();
- byteOut = null;
- }
- if (byteIn != null)
- {
- byteIn.Close();
- byteIn = null;
- }
- }
-
- IBytesMessageCloak IBytesMessageCloak.Copy()
- {
- IBytesMessageCloak bcloak = new AMQPBytesMessageCloak(connection);
- this.CopyInto(bcloak);
- return bcloak;
- }
-
- public override void ClearBody()
- {
- this.Reset();
- Content = null;
- }
-
- protected override void CopyInto(IMessageCloak msg)
- {
- base.CopyInto(msg);
- this.Reset();
- IBytesMessageCloak bmsg = msg as IBytesMessageCloak;
- bmsg.Content = this.Content;
-
-
- }
-
- private Data GetBinaryFromBody()
- {
- RestrictedDescribed body = message.BodySection;
- Data result = EMPTY_DATA;
- if(body == null)
- {
- return result;
- }
- else if (body is Data)
- {
- byte[] binary = (body as Data).Binary;
- if(binary != null && binary.Length != 0)
- {
- return body as Data;
- }
- }
- else if (body is AmqpValue)
- {
- object value = (body as AmqpValue).Value;
- if(value == null) { return result; }
- if(value is byte[])
- {
- byte[] dataValue = value as byte[];
- if (dataValue.Length > 0)
- {
- result = new Data();
- result.Binary = dataValue;
- }
- }
- else
- {
- throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
- }
- }
- else
- {
- throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
- }
-
- return result;
- }
- public override string ToString()
- {
- string result = base.ToString();
- if (this.Content != null)
- {
- result += string.Format("\nMessage Body: {0}\n", this.Content.ToString());
- }
- return result;
- }
- }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs
deleted file mode 100644
index 9754783..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- using Cloak;
- using Factory;
- using Util;
- using Util.Types;
- using Util.Types.Map.AMQP;
- class AMQPMapMessageCloak : AMQPMessageCloak, IMapMessageCloak
- {
- private IPrimitiveMap map = null;
- private Map amqpmap = null;
-
-
- internal AMQPMapMessageCloak(Connection conn) : base(conn)
- {
- InitializeMapBody();
- }
-
- internal AMQPMapMessageCloak(MessageConsumer c, Amqp.Message msg) : base(c, msg)
- {
- InitializeMapBody();
- }
-
- internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MAP; } }
-
- private void InitializeMapBody()
- {
- if (message.BodySection == null)
- {
- amqpmap = new Map();
- map = new AMQPValueMap(amqpmap);
- AmqpValue val = new AmqpValue();
- val.Value = amqpmap;
- message.BodySection = val;
- }
- else
- {
- if (message.BodySection is AmqpValue)
- {
- object obj = (message.BodySection as AmqpValue).Value;
- if (obj == null)
- {
- amqpmap = new Map();
- map = new AMQPValueMap(amqpmap);
- (message.BodySection as AmqpValue).Value = amqpmap;
- }
- else if (obj is Map)
- {
- amqpmap = obj as Map;
- map = new AMQPValueMap(amqpmap);
- }
- else
- {
- throw new NMSException(string.Format("Invalid message body value type. Type: {0}.", obj.GetType().Name));
- }
- }
- else
- {
- throw new NMSException("Invalid message body type.");
- }
- }
-
- }
-
- IPrimitiveMap IMapMessageCloak.Map
- {
- get
- {
- return map;
- }
- }
-
- IMapMessageCloak IMapMessageCloak.Copy()
- {
- IMapMessageCloak copy = new AMQPMapMessageCloak(Connection);
- CopyInto(copy);
- return copy;
- }
-
- protected override void CopyInto(IMessageCloak msg)
- {
- base.CopyInto(msg);
- IPrimitiveMap copy = (msg as IMapMessageCloak).Map;
- foreach (string key in this.map.Keys)
- {
- object value = map[key];
- if (value != null)
- {
- Type valType = value.GetType();
- if (valType.IsPrimitive)
- {
- // value copy primitive value
- copy[key] = value;
- }
- else if (valType.IsArray && valType.Equals(typeof(byte[])))
- {
- // use IPrimitive map SetBytes for most common implementation this is a deep copy.
- byte[] original = value as byte[];
- copy.SetBytes(key, original);
- }
- else if (valType.Equals(typeof(IDictionary)) || valType.Equals(typeof(Amqp.Types.Map)))
- {
- // reference copy
- copy.SetDictionary(key, value as IDictionary);
- }
- else if (valType.Equals(typeof(IList)) || valType.Equals(typeof(Amqp.Types.List)))
- {
- // reference copy
- copy.SetList(key, value as IList);
- }
- else
- {
- copy[key] = value;
- }
- }
- else
- {
- copy[key] = value;
- }
-
- }
- }
-
- public override string ToString()
- {
- string result = base.ToString();
- if(this.map != null)
- {
- result +=string.Format("\nMessage Body: {0}\n", ConversionSupport.ToString(this.map));
- }
- return result;
- }
-
- }
-
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs
deleted file mode 100644
index 95f1465..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- using Util;
- using Cloak;
-
- class AMQPMessageBuilder
- {
- public static IMessage CreateProviderMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IMessage msg = null;
- msg = CreateFromMessageAnnontations(consumer, message);
- if(msg == null)
- {
- msg = CreateFromMessageBody(consumer, message);
- }
- if(msg == null)
- {
- throw new NMSException("Could not create NMS Message.");
- }
- return msg;
- }
-
- private static IMessage CreateFromMessageBody(MessageConsumer consumer, Amqp.Message message)
- {
- IMessage msg = null;
- object body = message.Body;
- if(body == null)
- {
- if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
- {
- msg = CreateObjectMessage(consumer, message);
- }
- else if (IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
- {
- msg = CreateBytesMessage(consumer, message);
- }
- else
- {
- Symbol contentType = GetContentType(message);
- if(contentType != null)
- {
- msg = CreateTextMessage(consumer, message);
- }
- else
- {
- msg = CreateMessage(consumer, message);
- }
- }
- }
- else if (message.BodySection is Data)
- {
- if(IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
- {
- msg = CreateBytesMessage(consumer, message);
- }
- else if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
- {
- msg = CreateObjectMessage(consumer, message);
- }
- else
- {
- Symbol contentType = GetContentType(message);
- if(contentType != null)
- {
- msg = CreateTextMessage(consumer, message);
- }
- else
- {
- msg = CreateBytesMessage(consumer, message);
- }
- }
- }
- else if (message.BodySection is AmqpSequence)
- {
- msg = CreateObjectMessage(consumer, message);
- }
- else if (body is string)
- {
- msg = CreateTextMessage(consumer, message);
- }
- else if (body is byte[])
- {
- msg = CreateBytesMessage(consumer, message);
- }
- else
- {
- msg = CreateObjectMessage(consumer, message);
- }
-
- return msg;
- }
-
- private static Symbol GetContentType(Amqp.Message message)
- {
- Properties msgProps = message.Properties;
- if (msgProps == null)
- {
- return null;
- }
- else
- {
- return msgProps.ContentType;
- }
- }
-
- private static bool IsContentType(Symbol type, Amqp.Message message)
- {
- Symbol contentType = GetContentType(message);
- if (contentType == null)
- {
- return type == null;
- }
- else
- {
- return type.Equals(contentType);
- }
- }
-
- private static IMessage CreateFromMessageAnnontations(MessageConsumer consumer, Amqp.Message message)
- {
- IMessage msg = null;
- object objVal = message.MessageAnnotations[SymbolUtil.JMSX_OPT_MSG_TYPE];
- if(objVal != null && objVal is SByte)
- {
- byte type = Convert.ToByte(objVal);
- switch (type)
- {
- case MessageSupport.JMS_TYPE_MSG:
- msg = CreateMessage(consumer, message);
- break;
- case MessageSupport.JMS_TYPE_BYTE:
- msg = CreateBytesMessage(consumer, message);
- break;
- case MessageSupport.JMS_TYPE_TXT:
- msg = CreateTextMessage(consumer, message);
- break;
- case MessageSupport.JMS_TYPE_OBJ:
- msg = CreateObjectMessage(consumer, message);
- break;
- case MessageSupport.JMS_TYPE_STRM:
- msg = CreateStreamMessage(consumer, message);
- break;
- case MessageSupport.JMS_TYPE_MAP:
- msg = CreateMapMessage(consumer, message);
- break;
- default:
- throw new NMSException("Unsupported Msg Annontation type: " + type);
- }
-
- }
- return msg;
- }
-
- private static IMessage CreateMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IMessageCloak cloak = new AMQPMessageCloak(consumer, message);
- return new Message(cloak);
- }
-
- private static IMessage CreateTextMessage(MessageConsumer consumer, Amqp.Message message)
- {
- ITextMessageCloak cloak = new AMQPTextMessageCloak(consumer, message);
- return new TextMessage(cloak);
- }
-
- private static IMessage CreateStreamMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IStreamMessageCloak cloak = new AMQPStreamMessageCloak(consumer, message);
- return new StreamMessage(cloak);
- }
-
- private static IMessage CreateObjectMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IObjectMessageCloak cloak = new AMQPObjectMessageCloak(consumer, message);
- return new ObjectMessage(cloak);
- }
-
- private static IMessage CreateMapMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IMapMessageCloak cloak = new AMQPMapMessageCloak(consumer, message);
- return new MapMessage(cloak);
- }
-
- private static IMessage CreateBytesMessage(MessageConsumer consumer, Amqp.Message message)
- {
- IBytesMessageCloak cloak = new AMQPBytesMessageCloak(consumer, message);
- return new BytesMessage(cloak);
- }
-
- }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs
deleted file mode 100644
index dde222e..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs
+++ /dev/null
@@ -1,650 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
-using Amqp;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- using Util;
- using Util.Types.Map.AMQP;
- using Cloak;
- using Factory;
- using System.Reflection;
-
- class AMQPMessageCloak : IMessageCloak
- {
- private TimeSpan timeToLive;
- private IDestination replyTo;
- private bool redelivered = false;
- private string msgId;
- private IDestination destination;
- private string correlationId;
- private IPrimitiveMap properties;
- private MessagePropertyIntercepter propertyHelper;
-
- private Header messageHeader = null;
- private DeliveryAnnotations deliveryAnnontations = null;
- private MessageAnnotations messageAnnontations = null;
- private ApplicationProperties applicationProperties = null;
- private Properties messageProperties = null;
-
-#pragma warning disable CS0414
- private Footer messageFooter = null;
-#pragma warning restore CS0414
-
- private byte[] content;
- private bool readOnlyProperties = false;
-
- protected Amqp.Message message;
- protected readonly Connection connection;
- protected MessageConsumer consumer;
-
- internal AMQPMessageCloak(Connection c)
- {
- message = new Amqp.Message();
- connection = c;
- InitMessage();
- }
-
- internal AMQPMessageCloak(MessageConsumer c, Amqp.Message msg)
- {
- message = msg;
- consumer = c;
- connection = c.Session.Connection;
- InitMessage();
- InitDeliveryAnnotations();
- }
-
-
- #region Internal Properties
-
- internal Connection Connection { get { return connection; } }
-
- internal Amqp.Message AMQPMessage { get { return message; } }
-
- internal virtual byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MSG; } }
-
- #endregion
-
- private void InitMessage()
- {
- InitMessageHeader();
- InitMessageProperties();
- SetMessageAnnotation(SymbolUtil.JMSX_OPT_MSG_TYPE, (sbyte)JMSMessageType);
- }
-
- protected virtual void CopyInto(IMessageCloak msg)
- {
- MessageTransformation.CopyNMSMessageProperties(this, msg);
- msg.AckHandler = this.AckHandler;
- }
-
- #region Protected Amqp.Message Initialize/Accessor
-
- protected void InitMessageHeader()
- {
- if (this.messageHeader == null && this.message.Header == null)
- {
- this.messageHeader = new Header();
- this.message.Header = this.messageHeader;
- }
- else if (this.messageHeader == null && this.message.Header != null)
- {
- this.messageHeader = this.message.Header;
- }
- else if (this.messageHeader != null && this.message.Header == null)
- {
- this.message.Header = this.messageHeader;
- }
- }
-
- protected void InitMessageProperties()
- {
- if (this.messageProperties == null && this.message.Properties == null)
- {
- this.messageProperties = new Properties();
- this.message.Properties = this.messageProperties;
- }
- else if (this.messageProperties == null && this.message.Properties != null)
- {
- this.messageProperties = this.message.Properties;
- }
- else if (this.messageProperties != null && this.message.Properties == null)
- {
- this.message.Properties = this.messageProperties;
- }
- }
-
- protected void InitApplicationProperties()
- {
- if (this.applicationProperties == null && this.message.ApplicationProperties == null)
- {
- this.applicationProperties = new ApplicationProperties();
- this.message.ApplicationProperties = this.applicationProperties;
- }
- else if (this.applicationProperties == null && this.message.ApplicationProperties != null)
- {
- this.applicationProperties = this.message.ApplicationProperties;
- }
- else if (this.applicationProperties != null && this.message.ApplicationProperties == null)
- {
- this.message.ApplicationProperties = this.applicationProperties;
-
- }
- }
-
- protected void InitDeliveryAnnotations()
- {
- if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations == null)
- {
- this.deliveryAnnontations = new DeliveryAnnotations();
- this.message.DeliveryAnnotations = this.deliveryAnnontations;
- }
- else if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations != null)
- {
- this.deliveryAnnontations = this.message.DeliveryAnnotations;
- }
- else if (this.deliveryAnnontations != null && this.message.DeliveryAnnotations == null)
- {
- this.message.DeliveryAnnotations = this.deliveryAnnontations;
- }
- }
-
- protected void InitMessageAnnontations()
- {
- if (this.messageAnnontations == null && this.message.MessageAnnotations == null)
- {
- this.messageAnnontations = new MessageAnnotations();
- this.message.MessageAnnotations = messageAnnontations;
- }
- else if (this.messageAnnontations == null && this.message.MessageAnnotations != null)
- {
- this.messageAnnontations = this.message.MessageAnnotations;
- }
- else if (this.messageAnnontations != null && this.message.MessageAnnotations == null)
- {
- this.message.MessageAnnotations = this.messageAnnontations;
- }
- }
-
- protected void SetDeliveryAnnotation(Symbol key, object value)
- {
- InitDeliveryAnnotations();
- this.deliveryAnnontations[key] = value;
- }
-
- protected void SetMessageAnnotation(Symbol key, object value)
- {
- InitMessageAnnontations();
- messageAnnontations[key] = value;
- }
-
- protected object GetMessageAnnotation(Symbol key)
- {
- InitMessageAnnontations();
- return messageAnnontations[key];
- }
-
- #endregion
-
- #region IMessageCloak Properties
-
- public bool IsReceived { get { return consumer != null; } }
-
- public virtual byte[] Content
- {
- get
- {
- return content;
- }
-
- set
- {
- content = value;
- }
- }
-
- public virtual bool IsBodyReadOnly { get; set; }
-
- public virtual bool IsPropertiesReadOnly
- {
- get
- {
- return (this.propertyHelper == null) ? readOnlyProperties : this.propertyHelper.ReadOnly;
- }
- set
- {
- if (this.propertyHelper != null)
- this.propertyHelper.ReadOnly = value;
- readOnlyProperties = value;
- }
- }
-
-
- public string NMSCorrelationID
- {
- get
- {
- if ( null != this.correlationId)
- {
- return this.correlationId;
- }
- object objId = this.messageProperties.GetCorrelationId();
- if (objId != null)
- {
- // correlationId strings are returned as-is to the application, otherwise
- // convert it to a NMSMessageId string
- if (objId is string)
- {
- this.correlationId = objId as string;
- }
- else
- {
- this.correlationId = MessageSupport.CreateNMSMessageId(objId);
- }
- }
-
- return this.correlationId;
- }
- set
- {
- object objId = MessageSupport.CreateAMQPMessageId(value);
- this.messageProperties.SetCorrelationId(objId);
- this.correlationId = value;
- }
- }
-
- public MsgDeliveryMode NMSDeliveryMode
- {
- get
- {
- if (this.messageHeader.Durable)
- {
- return MsgDeliveryMode.Persistent;
- }
- else
- {
- return MsgDeliveryMode.NonPersistent;
- }
-
- }
- set
- {
- if (value.Equals(MsgDeliveryMode.Persistent))
- {
- this.messageHeader.Durable = true;
- }
- else
- {
- this.messageHeader.Durable = false;
- }
- }
- }
-
- public IDestination NMSDestination
- {
- get
- {
- if (destination == null && consumer != null)
- {
- object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST);
- if (typeObj != null)
- {
- byte type = Convert.ToByte(typeObj);
- destination = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type);
- if(destination == null)
- {
- destination = consumer.Destination;
- }
- }
- }
- return destination;
- }
- set
- {
- string destString = null;
- IDestination dest = null;
- if (value != null) {
- destString = UriUtil.GetAddress(value, Connection);
- dest = value;
- }
- this.messageProperties.To = destString;
- SetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST, MessageSupport.GetValueForDestination(dest));
- destination = dest;
- }
- }
-
- public string NMSMessageId
- {
- get
- {
- object objId = this.messageProperties.GetMessageId();
- if (this.msgId == null && objId != null)
- {
- this.msgId = MessageSupport.CreateNMSMessageId(objId);
- }
- return this.msgId;
- }
- set
- {
- object msgId = MessageSupport.CreateAMQPMessageId(value);
- //Tracer.InfoFormat("Set message Id to <{0}>: {1}", msgId.GetType().Name, msgId.ToString());
- this.messageProperties.SetMessageId(msgId);
- this.msgId = value;
- }
- }
-
- public MsgPriority NMSPriority
- {
- get { return MessageSupport.GetPriorityFromValue(this.messageHeader.Priority); }
- set
- {
- this.messageHeader.Priority = MessageSupport.GetValueForPriority(value);
- }
- }
-
- public bool NMSRedelivered
- {
- get
- {
- if (this.messageHeader.DeliveryCount > 0)
- {
- redelivered = true;
- }
- return redelivered;
- }
- set { redelivered = value; }
- }
-
- public IDestination NMSReplyTo
- {
- get
- {
- if (replyTo == null && IsReceived)
- {
- object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO);
- if (typeObj != null)
- {
- byte type = Convert.ToByte(typeObj);
- replyTo = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type, true);
- }
- }
- return replyTo;
- }
- set
- {
- IDestination dest = null;
- string destString = null;
- if (value != null)
- {
- destString = UriUtil.GetAddress(value, Connection);
- dest = value;
- SetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO, MessageSupport.GetValueForDestination(dest));
- }
- this.messageProperties.ReplyTo = destString;
-
- replyTo = dest;
- }
- }
-
- public DateTime NMSTimestamp
- {
- get { return messageProperties.CreationTime; }
- set
- {
- messageProperties.CreationTime = value;
- if (NMSTimeToLive != null && NMSTimeToLive != TimeSpan.Zero)
- {
- messageProperties.AbsoluteExpiryTime = value + timeToLive;
- }
- }
- }
-
- public TimeSpan NMSTimeToLive
- {
- get
- {
- if ( timeToLive != null)
- {
- return timeToLive;
- }
- if (messageProperties.AbsoluteExpiryTime == DateTime.MinValue)
- {
-
- timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(this.messageHeader.Ttl));
- return timeToLive;
- }
- else
- {
- return messageProperties.AbsoluteExpiryTime - NMSTimestamp;
- }
- }
- set
- {
- timeToLive = value;
- }
- }
-
- public string NMSType
- {
- get { return this.messageProperties.Subject; }
- set { this.messageProperties.Subject = value; }
- }
-
- public IPrimitiveMap Properties
- {
- get
- {
- if (properties == null)
- {
- InitApplicationProperties();
- properties = new AMQPPrimitiveMap(this.applicationProperties);
- propertyHelper = new MessagePropertyIntercepter(this, properties, readOnlyProperties);
- }
- return propertyHelper;
- }
- }
-
- public int DeliveryCount
- {
- get
- {
- return Convert.ToInt32(this.messageHeader.DeliveryCount);
- }
-
- set
- {
- this.messageHeader.DeliveryCount = Convert.ToUInt32(value);
- }
- }
-
- public int RedeliveryCount
- {
- get
- {
- return DeliveryCount - 1;
- }
-
- set
- {
- DeliveryCount = value + 1;
- }
- }
-
- public MessageAcknowledgementHandler AckHandler { get; set; }
-
- public void Acknowledge()
- {
- if (AckHandler != null)
- {
- if (connection.IsClosed)
- {
- throw new IllegalStateException("Can not acknowledge Message on closed connection.");
- }
-
- AckHandler.Acknowledge();
- AckHandler = null;
- }
- }
-
- public virtual void ClearBody()
- {
- Content = null;
- }
-
- public virtual void ClearProperties()
- {
- if (properties != null)
- {
- propertyHelper.Clear();
- }
- }
-
- public virtual IMessageCloak Copy()
- {
- IMessageCloak copy = null;
- switch(JMSMessageType)
- {
- case MessageSupport.JMS_TYPE_MSG:
- copy = new AMQPMessageCloak(connection);
- break;
- case MessageSupport.JMS_TYPE_BYTE:
- copy = new AMQPBytesMessageCloak(connection);
- break;
- case MessageSupport.JMS_TYPE_TXT:
- copy = new AMQPTextMessageCloak(connection);
- break;
- case MessageSupport.JMS_TYPE_MAP:
- copy = new AMQPMapMessageCloak(connection);
- break;
- case MessageSupport.JMS_TYPE_STRM:
- copy = new AMQPStreamMessageCloak(connection);
- break;
- case MessageSupport.JMS_TYPE_OBJ:
- copy = new AMQPObjectMessageCloak(connection, (this as AMQPObjectMessageCloak).Type);
- break;
- default:
- throw new NMSException("Fatal error Invalid JMS type.");
- }
-
- CopyInto(copy);
- return copy;
- }
-
- public object GetMessageAnnotation(string symbolKey)
- {
- Symbol sym = symbolKey;
- return GetMessageAnnotation(sym);
- }
-
- public void SetMessageAnnotation(string symbolKey, object value)
- {
- Symbol sym = symbolKey;
- SetMessageAnnotation(sym, value);
- }
-
- public object GetDeliveryAnnotation(string symbolKey)
- {
- Symbol sym = symbolKey;
- return GetDeliveryAnnotation(sym);
- }
-
- public void SetDeliveryAnnotation(string symbolKey, object value)
- {
- Symbol sym = symbolKey;
- SetDeliveryAnnotation(sym, value);
- }
-
- public string GetContentType()
- {
- return GetContentTypeSymbol();
- }
-
- public void SetContentType(string type)
- {
- SetContentType(new Symbol(type));
- }
-
- protected virtual Symbol GetContentTypeSymbol()
- {
- return this.messageProperties.ContentType;
- }
-
- protected virtual void SetContentType(Symbol type)
- {
- this.messageProperties.ContentType = type;
- }
-
- #endregion
-
- public override string ToString()
- {
- string result = string.Format("{0}:\n", this.GetType());
- result += string.Format("inner amqp message: \n{0}\n", AMQPMessageCloak.ToString(message));
- result += "NMS Fields = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
- if (prop.GetGetMethod(true).IsPublic)
- {
- try
- {
- Object val = prop.GetValue(this, null);
- if (val is IPrimitiveMap )
- {
- result += prop.Name + " = " + ConversionSupport.ToString(val as IPrimitiveMap) +",\n";
- }
- else
- {
- result += string.Format("{0} = {1},\n", prop.Name, val);
- }
- }catch(TargetInvocationException tie)
- {
- Tracer.InfoFormat("Failed to invoke Member field accessor: {0}, cause: {1}", prop.Name, tie);
- }
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- public static string ToString(Amqp.Message message)
- {
- if (message == null) return "null";
- string result = "Type="+ message.GetType().Name +":\n";
-
- if (message.Header != null)
- {
- result += "Message Header: " + message.Header.ToString() + "\n";
- }
-
- return result;
- }
- }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs
deleted file mode 100644
index 454b8e8..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Message.Factory;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- class AMQPMessageTransformation <T> : MessageTransformation where T:ConnectionInfo
- {
- protected readonly Connection connection;
- protected readonly MessageFactory<T> factory;
-
- public AMQPMessageTransformation(AMQPMessageFactory<T> fact) : base()
- {
- connection = fact.Parent;
- factory = fact;
- }
-
- protected override IBytesMessage DoCreateBytesMessage()
- {
- return factory.CreateBytesMessage();
- }
-
- protected override IMapMessage DoCreateMapMessage()
- {
- return factory.CreateMapMessage();
- }
-
- protected override IMessage DoCreateMessage()
- {
- return factory.CreateMessage();
- }
-
- protected override IObjectMessage DoCreateObjectMessage()
- {
- return factory.CreateObjectMessage(null);
- }
-
- protected override IStreamMessage DoCreateStreamMessage()
- {
- return factory.CreateStreamMessage();
- }
-
- protected override ITextMessage DoCreateTextMessage()
- {
- return factory.CreateTextMessage();
- }
-
- protected override void DoPostProcessMessage(IMessage message)
- {
- // nothing for now
- }
-
- protected override IDestination DoTransformDestination(IDestination destination)
- {
- return DestinationTransformation.Transform(connection, destination);
- }
- }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs
deleted file mode 100644
index 49b6a03..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Runtime.Serialization;
-using System.Runtime.Serialization.Formatters.Binary;
-using System.IO;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- using Amqp;
- using Cloak;
- using Util;
- using Util.Types;
-
- class AMQPObjectMessageCloak : AMQPMessageCloak, IObjectMessageCloak
- {
-
- public static AMQPObjectEncodingType DEFAULT_ENCODING_TYPE = AMQPObjectEncodingType.AMQP_TYPE;
-
- private IAMQPObjectSerializer objectSerializer;
-
- #region Contructors
- internal AMQPObjectMessageCloak(Apache.NMS.AMQP.Connection c, AMQPObjectEncodingType type) : base(c)
- {
- InitializeObjectSerializer(type);
- Body = null;
- }
-
- internal AMQPObjectMessageCloak(MessageConsumer mc, Amqp.Message message) : base(mc, message)
- {
- if (message.Properties.ContentType.Equals(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE))
- {
- if(message.MessageAnnotations.Map.ContainsKey(MessageSupport.JMS_JAVA_ENCODING)
- && message.MessageAnnotations.Map[MessageSupport.JMS_JAVA_ENCODING].Equals(SymbolUtil.BOOLEAN_TRUE))
- {
- InitializeObjectSerializer(AMQPObjectEncodingType.JAVA_SERIALIZABLE);
- }
- else
- {
- InitializeObjectSerializer(AMQPObjectEncodingType.DOTNET_SERIALIZABLE);
- }
- }
- else
- {
- InitializeObjectSerializer(AMQPObjectEncodingType.AMQP_TYPE);
- }
- }
-
- #endregion
-
- #region Internal Properties Fields
- internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_OBJ; } }
- #endregion
-
- #region Public IObjectMessageCloak Properties
- public AMQPObjectEncodingType Type { get { return this.objectSerializer.Type; } }
-
- public object Body
- {
- get
- {
- return this.objectSerializer.GetObject();
- }
- set
- {
- this.objectSerializer.SetObject(value);
- }
- }
-
- public override byte[] Content
- {
- get
- {
- return null;
- }
- set
- {
-
- }
- }
-
- #endregion
-
- #region IMessageCloak Copy Methods
-
- IObjectMessageCloak IObjectMessageCloak.Copy()
- {
- IObjectMessageCloak ocloak = new AMQPObjectMessageCloak(connection, this.objectSerializer.Type);
- this.CopyInto(ocloak);
- return ocloak;
- }
-
-
- protected override void CopyInto(IMessageCloak msg)
- {
- base.CopyInto(msg);
- if (msg is IObjectMessageCloak)
- {
- IObjectMessageCloak copy = msg as IObjectMessageCloak;
- if (copy is AMQPObjectMessageCloak)
- {
- this.objectSerializer.CopyInto((copy as AMQPObjectMessageCloak).objectSerializer);
- }
- else
- {
- this.objectSerializer.SetObject(copy.Body);
- }
- }
-
- }
-
- #endregion
-
- #region Private Methods
-
- private void InitializeObjectSerializer(AMQPObjectEncodingType type)
- {
- switch (type)
- {
- case AMQPObjectEncodingType.AMQP_TYPE:
- objectSerializer = new AMQPTypeSerializer(this);
- break;
- case AMQPObjectEncodingType.DOTNET_SERIALIZABLE:
- objectSerializer = new DotnetObjectSerializer(this);
- break;
- case AMQPObjectEncodingType.JAVA_SERIALIZABLE:
- objectSerializer = new JavaObjectSerializer(this);
- break;
- default:
- throw NMSExceptionSupport.Create(new ArgumentException("Unsupported object encoding."));
- }
- }
-
- #endregion
- }
-
- #region IAMQPObjectSerializer
-
- #region IAMQPObjectSerializer Interface
- internal interface IAMQPObjectSerializer
- {
- Amqp.Message Message { get; }
- void SetObject(object o);
- object GetObject();
-
- void CopyInto(IAMQPObjectSerializer serializer);
-
- AMQPObjectEncodingType Type { get; }
-
- }
-
- #endregion
-
- #region AMQP Type IAMQPObjectSerializer Implementation
- class AMQPTypeSerializer : IAMQPObjectSerializer
- {
-
- private readonly Amqp.Message amqpMessage;
- private readonly AMQPObjectMessageCloak message;
- internal AMQPTypeSerializer(AMQPObjectMessageCloak msg)
- {
- amqpMessage = msg.AMQPMessage;
- message = msg;
- msg.SetMessageAnnotation(MessageSupport.JMS_AMQP_TYPE_ENCODING, SymbolUtil.BOOLEAN_TRUE);
- }
-
- public Message Message { get { return amqpMessage; } }
-
- public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.AMQP_TYPE; } }
-
- public void CopyInto(IAMQPObjectSerializer serializer)
- {
- serializer.SetObject(GetObject());
- }
-
- public object GetObject()
- {
- RestrictedDescribed body = amqpMessage.BodySection;
- if(body == null)
- {
- return null;
- }
- else if (body is AmqpValue)
- {
- AmqpValue value = body as AmqpValue;
- return value.Value;
- }
- else if (body is Data)
- {
- return (body as Data).Binary;
- }
- else if (body is AmqpSequence)
- {
- return (body as AmqpSequence).List;
- }
- else
- {
- throw new IllegalStateException("Unexpected body type: " + body.GetType().Name);
- }
- }
-
- public void SetObject(object o)
- {
- if(o == null)
- {
- amqpMessage.BodySection = MessageSupport.NULL_AMQP_VALUE_BODY;
- }
- else if (IsNMSObjectTypeSupported(o))
- {
- object value = null;
- if(o is IList)
- {
- value = ConversionSupport.ListToAmqp(o as IList);
- }
- else if (o is IPrimitiveMap)
- {
- value = ConversionSupport.NMSMapToAmqp(o as IPrimitiveMap);
- }
- else
- {
- value = o;
- }
- // to copy the object being set encode a message then decode and take body
- Amqp.Message copy = new Amqp.Message(value);
- ByteBuffer buffer = copy.Encode();
- copy = Message.Decode(buffer);
-
- amqpMessage.BodySection = new AmqpValue { Value = copy.Body };
- }
- else
- {
- throw new ArgumentException("Encoding unexpected object type: " + o.GetType().Name);
- }
- }
-
- private bool IsNMSObjectTypeSupported(object o)
- {
- return ConversionSupport.IsNMSType(o) || o is List || o is Map || o is IPrimitiveMap || o is IList;
- }
- }
-
- #endregion
-
- #region Dotnet Serializable IAMQPObjectSerializer Implementation
-
- class DotnetObjectSerializer : IAMQPObjectSerializer
- {
- private readonly Amqp.Message amqpMessage;
- private readonly AMQPObjectMessageCloak message;
- internal DotnetObjectSerializer(AMQPObjectMessageCloak msg)
- {
- amqpMessage = msg.AMQPMessage;
- message = msg;
- msg.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- msg.SetMessageAnnotation(MessageSupport.JMS_DONET_ENCODING, SymbolUtil.BOOLEAN_TRUE);
- }
-
- public Message Message { get { return amqpMessage; } }
-
- public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.DOTNET_SERIALIZABLE; } }
-
- public void CopyInto(IAMQPObjectSerializer serializer)
- {
- serializer.SetObject(GetObject());
- }
-
- public object GetObject()
- {
- byte[] bin = null;
- if(Message.BodySection == null)
- {
- return null;
- }
- else if (Message.BodySection is Data)
- {
- Data data = Message.BodySection as Data;
- bin = data.Binary;
- }
- // TODO handle other body types.
-
- if (bin == null || bin.Length == 0)
- {
- return null;
- }
- else
- {
- return GetDeserializedObject(bin);
- }
-
- }
-
- public void SetObject(object o)
- {
-
- byte[] bin = GetSerializedObject(o);
- if(bin == null || bin.Length == 0)
- {
- amqpMessage.BodySection = MessageSupport.EMPTY_DATA;
- }
- else
- {
- amqpMessage.BodySection = new Data() { Binary = bin };
- }
-
- }
-
- private object GetDeserializedObject(byte[] binary)
- {
- object result = null;
-
- MemoryStream stream = null;
- IFormatter formatter = null;
- try
- {
- stream = new MemoryStream(binary);
- formatter = new BinaryFormatter();
- result = formatter.Deserialize(stream);
- }
- finally
- {
- stream?.Close();
- }
-
-
- return result;
-
- }
-
- private byte[] GetSerializedObject(object o)
- {
- if (o == null) return new byte[] { 0xac,0xed,0x00,0x05,0x70 };
- MemoryStream stream = null;
- IFormatter formatter = null;
- byte[] result = null;
- try
- {
- stream = new MemoryStream();
- formatter = new BinaryFormatter();
- formatter.Serialize(stream, o);
- result = stream.ToArray();
- }
- finally
- {
- if(stream!= null)
- {
- stream.Close();
- }
- }
-
- return result;
- }
- }
-
- #endregion
-
- #region Java Serializable IAMQPObjectSerializer Implementation
-
- class JavaObjectSerializer : IAMQPObjectSerializer
- {
- private readonly Amqp.Message amqpMessage;
- private readonly AMQPObjectMessageCloak message;
- internal JavaObjectSerializer(AMQPObjectMessageCloak msg)
- {
- amqpMessage = msg.AMQPMessage;
- message = msg;
- message.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- message.SetMessageAnnotation(MessageSupport.JMS_JAVA_ENCODING, SymbolUtil.BOOLEAN_TRUE);
- }
-
- public Message Message { get { return amqpMessage; } }
-
- public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.JAVA_SERIALIZABLE; } }
-
- public void CopyInto(IAMQPObjectSerializer serializer)
- {
- // TODO fix to copy java serialized object as binary.
- serializer.SetObject(GetObject());
- }
-
- public object GetObject()
- {
- throw new NotImplementedException("Java Serialized Object body Not Supported.");
- }
-
- public void SetObject(object o)
- {
- throw new NotImplementedException("Java Serialized Object body Not Supported.");
- }
- }
-
- #endregion // Java IAMQPObjectSerializer Impl
-
- #endregion // IAMQPObjectSerializer
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs
deleted file mode 100644
index 27dad32..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-
- using Cloak;
- using Util;
- using Factory;
- class AMQPStreamMessageCloak : AMQPMessageCloak, IStreamMessageCloak
- {
- private IList list;
- private int position = 0;
- internal AMQPStreamMessageCloak(Connection c):base(c)
- {
- list = InitializeEmptyBody(true);
- }
-
- internal AMQPStreamMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg)
- {
- if (msg.BodySection == null)
- {
- list = InitializeEmptyBody(true);
- }
- else if (msg.BodySection is AmqpSequence)
- {
- IList value = (msg.BodySection as AmqpSequence).List;
- if(value == null)
- {
- list = InitializeEmptyBody(true);
- }
- else
- {
- list = value;
- }
- }
- else if (msg.BodySection is AmqpValue)
- {
- object value = (msg.BodySection as AmqpValue).Value;
- if(value == null)
- {
- list = InitializeEmptyBody(false);
- }
- else if (value is IList)
- {
- list = value as IList;
- }
- else
- {
- throw new IllegalStateException("Unexpected amqp-value body content type: " + value.GetType().Name);
- }
- }
- else
- {
- throw new IllegalStateException("Unexpected message body type: " + msg.BodySection.GetType().Name);
- }
- }
-
- internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_STRM; } }
-
- #region Private Methods
-
- private List InitializeEmptyBody(bool isSequence)
- {
- List l = new List();
- if (isSequence)
- {
- AmqpSequence seq = new Amqp.Framing.AmqpSequence();
- message.BodySection = seq;
- seq.List = l;
-
- }
- else
- {
- Amqp.Framing.AmqpValue val = new Amqp.Framing.AmqpValue();
- val.Value = l;
- message.BodySection = val;
- }
- return l;
- }
-
- private bool IsEmpty { get { return list.Count <= 0; } }
-
- #endregion
-
- #region IStreamMessageCloak Methods
- public bool HasNext { get { return !IsEmpty && position < list.Count; } }
-
- public object Peek()
- {
- if(IsEmpty || position >= list.Count)
- {
- throw new EndOfStreamException("Attempt to read past the end of stream");
- }
- object value = list[position];
- if(value != null && value is byte[])
- {
- byte[] binary = value as byte[];
- byte[] bin = new byte[binary.Length];
- binary.CopyTo(bin, 0);
- value = bin;
- }
- return value;
- }
-
- public void Pop()
- {
- if (IsEmpty || position > list.Count)
- {
- throw new EndOfStreamException("Attempt to read past the end of stream");
- }
- position++;
- }
-
- public void Put(object value)
- {
- object entry = value;
- if (entry != null && entry is byte[])
- {
- byte[] bin = new byte[(entry as byte[]).Length];
- (entry as byte[]).CopyTo(bin, 0);
- entry = bin;
- }
- if (list.Add(entry) < 0)
- {
- throw NMSExceptionSupport.Create(string.Format("Failed to add {0} to stream.", entry.ToString()), null);
- }
- position++;
- }
-
- public void Reset()
- {
- position = 0;
- }
-
- public override void ClearBody()
- {
- base.ClearBody();
- list.Clear();
- position = 0;
- }
-
- IStreamMessageCloak IStreamMessageCloak.Copy()
- {
- return base.Copy() as IStreamMessageCloak;
- }
-
- protected override void CopyInto(IMessageCloak msg)
- {
- base.CopyInto(msg);
- if(msg is IStreamMessageCloak)
- {
- IStreamMessageCloak copy = (msg as IStreamMessageCloak);
-
- foreach(object o in list)
- {
- copy.Put(o);
- }
- }
- }
- public override string ToString()
- {
- string result = base.ToString();
- result += "\nMessage Body: {";
- foreach(object o in list)
- {
- //
- // handle byte arrays for now
- // add more special handlers as needed.
- //
- if (o is byte[])
- {
-
- result += string.Format("\n{0} len={1}: {2}", o.GetType(), (o as byte[]).Length, BitConverter.ToString(o as byte[]).Replace("-", " "));
- }
- else
- {
- result += string.Format("\n{0}: {1}", o.GetType(), o.ToString());
- }
- }
- result += "\n}";
- return result;
- }
- #endregion
- }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs
deleted file mode 100644
index dd093bc..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
- using Cloak;
- using Util;
- using Factory;
-
- class AMQPTextMessageCloak : AMQPMessageCloak, ITextMessageCloak
- {
- #region Constructor
-
- internal AMQPTextMessageCloak(Connection c) : base(c) {}
-
- internal AMQPTextMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg) {}
-
- #endregion
-
- internal override byte JMSMessageType
- {
- get
- {
- return MessageSupport.JMS_TYPE_TXT;
- }
- }
-
- public string Text
- {
- get
- {
- return GetTextFromBody();
- }
-
- set
- {
- AmqpValue val = new AmqpValue();
- val.Value = value;
- this.message.BodySection = val;
- }
- }
-
- ITextMessageCloak ITextMessageCloak.Copy()
- {
- ITextMessageCloak tcloak = new AMQPTextMessageCloak(connection);
- CopyInto(tcloak);
- return tcloak;
- }
-
- protected override void CopyInto(IMessageCloak msg)
- {
- base.CopyInto(msg);
- (msg as ITextMessageCloak).Text = Text;
- }
-
- private static string DecodeBinaryBody(byte[] body)
- {
- string result = string.Empty;
- if(body != null && body.Length > 0)
- {
- result = Encoding.UTF8.GetString(body);
- }
- return result;
- }
-
- private string GetTextFromBody()
- {
- string result = string.Empty;
- RestrictedDescribed body = this.message.BodySection;
- if(body == null)
- {
- return result;
- }
- else if (body is Data)
- {
- byte[] data = (body as Data).Binary;
- result = DecodeBinaryBody(data);
- }
- else if(body is AmqpValue)
- {
- object value = (body as AmqpValue).Value;
- if(value == null)
- {
- return result;
- }
- else if (value is byte[])
- {
- result = DecodeBinaryBody(value as byte[]);
- }
- else if (value is string)
- {
- result = value as string;
- }
- else
- {
- throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
- }
- }
- else
- {
- throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
- }
-
-
- return result;
- }
- public override string ToString()
- {
- string result = base.ToString();
- if (this.Text != null)
- {
- result += string.Format("\nMessage Body: {0}\n", Text);
- }
- return result;
- }
-
- }
-}
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/AckType.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/AckType.cs
index b85f947..248f0b3 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/AckType.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message
{
- interface ITextMessageCloak : IMessageCloak
+ public enum AckType
{
- string Text { get; set; }
- new ITextMessageCloak Copy();
+ ACCEPTED = 0,
+ REJECTED = 1,
+ RELEASED = 2,
+ MODIFIED_FAILED = 3,
+ MODIFIED_FAILED_UNDELIVERABLE = 4,
+ // Conceptual
+ DELIVERED = 5
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
similarity index 69%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
index 118b5ab..f165cfe 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,23 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
using System.IO;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- internal interface IBytesMessageCloak : IMessageCloak
+ public interface INmsBytesMessageFacade : INmsMessageFacade
{
- BinaryReader getDataReader();
- BinaryWriter getDataWriter();
-
- new IBytesMessageCloak Copy();
-
- int BodyLength { get; }
+ BinaryReader GetDataReader();
+ BinaryWriter GetDataWriter();
void Reset();
+ long BodyLength { get; }
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
index acb649a..18b9247 100644
--- a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,19 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- interface IMapMessageCloak : IMessageCloak
+ public interface INmsMapMessageFacade : INmsMessageFacade
{
IPrimitiveMap Map { get; }
- new IMapMessageCloak Copy();
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
similarity index 51%
rename from src/NMS.AMQP/Message/Cloak/IMessageCloak.cs
rename to src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
index 623449c..e91ef24 100644
--- a/src/NMS.AMQP/Message/Cloak/IMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
@@ -15,47 +15,29 @@
* limitations under the License.
*/
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- /// <summary>
- /// Provider specific Cloak Interface from provider implementation.
- /// </summary>
- interface IMessageCloak : IMessage
+ public interface INmsMessageFacade
{
- byte[] Content
- {
- get;
- set;
- }
-
- bool IsBodyReadOnly { get; set; }
-
- bool IsPropertiesReadOnly { get; set; }
-
- bool IsReceived { get; }
-
- IMessageCloak Copy();
-
- object GetMessageAnnotation(string symbolKey);
-
- void SetMessageAnnotation(string symbolKey, object value);
-
- object GetDeliveryAnnotation(string symbolKey);
-
- void SetDeliveryAnnotation(string symbolKey, object value);
-
+ NmsMessage AsMessage();
+ void ClearBody();
int DeliveryCount { get; set; }
-
int RedeliveryCount { get; set; }
-
- MessageAcknowledgementHandler AckHandler { get; set; }
-
+ void OnSend(TimeSpan producerTtl);
+ string NMSMessageId { get; set; }
+ IPrimitiveMap Properties { get; }
+ string NMSCorrelationID { get; set; }
+ IDestination NMSDestination { get; set; }
+ TimeSpan NMSTimeToLive { get; set; }
+ MsgDeliveryMode NMSDeliveryMode { get; set; }
+ MsgPriority NMSPriority { get; set; }
+ bool NMSRedelivered { get; set; }
+ IDestination NMSReplyTo { get; set; }
+ DateTime NMSTimestamp { get; set; }
+ string NMSType { get; set; }
+ DateTime Expiration { get; set; }
+ sbyte JmsMsgType { get; }
+ INmsMessageFacade Copy();
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
index b85f947..cea2edc 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,17 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- interface ITextMessageCloak : IMessageCloak
+ public interface INmsObjectMessageFacade : INmsMessageFacade
{
- string Text { get; set; }
- new ITextMessageCloak Copy();
+ object Body { get; set; }
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
similarity index 82%
copy from src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
index 8775d1b..d7e0523 100644
--- a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,26 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.IO;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- interface IStreamMessageCloak : IMessageCloak
+ public interface INmsStreamMessageFacade : INmsMessageFacade
{
-
-
- new IStreamMessageCloak Copy();
-
- bool HasNext { get; }
-
- void Reset();
-
- void Put(object value);
-
object Peek();
-
void Pop();
-
-
+ void Reset();
+ void Put(object value);
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
similarity index 77%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
index b85f947..0a1aaa9 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
@@ -14,17 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
{
- interface ITextMessageCloak : IMessageCloak
+ public interface INmsTextMessageFacade : INmsMessageFacade
{
string Text { get; set; }
- new ITextMessageCloak Copy();
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs b/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs
deleted file mode 100644
index 8b0268d..0000000
--- a/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Amqp;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
- using Cloak;
- using AMQP;
- class AMQPMessageFactory<T> : MessageFactory<T> where T : ConnectionInfo
- {
-
- protected readonly AMQPMessageTransformation<T> transformFactory;
- protected AMQPObjectEncodingType encodingType = AMQPObjectEncodingType.UNKOWN;
-
- internal AMQPMessageFactory(NMSResource<T> resource) : base(resource)
- {
- transformFactory = new AMQPMessageTransformation<T>(this);
- InitEncodingType();
- }
-
- internal MessageTransformation TransformFactory { get { return transformFactory; } }
-
- internal Connection Parent { get { return parent as Connection; } }
-
- public override MessageTransformation GetTransformFactory()
- {
- return transformFactory;
- }
-
- public override IBytesMessage CreateBytesMessage()
- {
- IBytesMessageCloak cloak = new AMQPBytesMessageCloak(Parent);
- return new BytesMessage(cloak);
- }
-
- public override IBytesMessage CreateBytesMessage(byte[] body)
- {
- IBytesMessage msg = CreateBytesMessage();
- msg.WriteBytes(body);
- return msg;
- }
-
- public override IMapMessage CreateMapMessage()
- {
- IMapMessageCloak cloak = new AMQPMapMessageCloak(Parent);
- return new MapMessage(cloak);
- }
-
- public override IMessage CreateMessage()
- {
- IMessageCloak cloak = new AMQPMessageCloak(Parent);
- return new Message(cloak);
- }
-
- public override IObjectMessage CreateObjectMessage(object body)
- {
- IObjectMessageCloak cloak = new AMQPObjectMessageCloak(Parent, encodingType);
- return new ObjectMessage(cloak) { Body=body };
- }
-
- public override IStreamMessage CreateStreamMessage()
- {
- IStreamMessageCloak cloak = new AMQPStreamMessageCloak(Parent);
- return new StreamMessage(cloak);
- }
-
- public override ITextMessage CreateTextMessage()
- {
- ITextMessageCloak cloak = new AMQPTextMessageCloak(Parent);
- return new TextMessage(cloak);
- }
-
- public override ITextMessage CreateTextMessage(string text)
- {
- ITextMessage msg = CreateTextMessage();
- msg.Text = text;
- return msg;
- }
-
- private void InitEncodingType()
- {
- encodingType = ConnectionEncodingType(Parent);
- Tracer.InfoFormat("Message Serialization for connection : {0}, is set to: {1}.", Parent.ClientId, encodingType.ToString());
- }
-
-
- private const string AMQP_TYPE = "amqp";
- private const string DOTNET_TYPE = "dotnet";
- private const string JAVA_TYPE = "java";
-
- private static AMQPObjectEncodingType ConnectionEncodingType(Connection connection)
- {
- string value = connection.Properties[Connection.MESSAGE_OBJECT_SERIALIZATION_PROP];
- if (value == null) return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
- if (value.ToLower().StartsWith(AMQP_TYPE))
- {
- return AMQPObjectEncodingType.AMQP_TYPE;
- }
- else if (value.ToLower().StartsWith(DOTNET_TYPE))
- {
- return AMQPObjectEncodingType.DOTNET_SERIALIZABLE;
- }
- else if (value.ToLower().StartsWith(JAVA_TYPE))
- {
- return AMQPObjectEncodingType.JAVA_SERIALIZABLE;
- }
- else
- {
- return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
- }
- }
-
- }
-}
diff --git a/src/NMS.AMQP/Message/Factory/IMessageFactory.cs b/src/NMS.AMQP/Message/Factory/IMessageFactory.cs
deleted file mode 100644
index ed9b2d7..0000000
--- a/src/NMS.AMQP/Message/Factory/IMessageFactory.cs
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
- interface IMessageFactory
- {
- MessageTransformation GetTransformFactory();
-
- // Factory methods to create messages
-
- /// <summary>
- /// Creates a new message with an empty body
- /// </summary>
- IMessage CreateMessage();
-
- /// <summary>
- /// Creates a new text message with an empty body
- /// </summary>
- ITextMessage CreateTextMessage();
-
- /// <summary>
- /// Creates a new text message with the given body
- /// </summary>
- ITextMessage CreateTextMessage(string text);
-
- /// <summary>
- /// Creates a new Map message which contains primitive key and value pairs
- /// </summary>
- IMapMessage CreateMapMessage();
-
- /// <summary>
- /// Creates a new Object message containing the given .NET object as the body
- /// </summary>
- IObjectMessage CreateObjectMessage(object body);
-
- /// <summary>
- /// Creates a new binary message
- /// </summary>
- IBytesMessage CreateBytesMessage();
-
- /// <summary>
- /// Creates a new binary message with the given body
- /// </summary>
- IBytesMessage CreateBytesMessage(byte[] body);
-
- /// <summary>
- /// Creates a new stream message
- /// </summary>
- IStreamMessage CreateStreamMessage();
-
-
- }
-}
diff --git a/src/NMS.AMQP/Message/Factory/MessageFactory.cs b/src/NMS.AMQP/Message/Factory/MessageFactory.cs
deleted file mode 100644
index 49e9b43..0000000
--- a/src/NMS.AMQP/Message/Factory/MessageFactory.cs
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
- internal abstract class MessageFactory<T> : IMessageFactory where T : ResourceInfo
- {
- private static readonly IDictionary<Id, IMessageFactory> resgistry;
-
- static MessageFactory()
- {
- resgistry = new ConcurrentDictionary<Id, IMessageFactory>();
- }
-
- public static void Register(NMSResource<T> resource)
- {
- if (resource is Connection)
- {
- resgistry.Add(resource.Id, (new AMQPMessageFactory<ConnectionInfo>(resource as Connection)) as IMessageFactory);
- }
- else
- {
- throw new NMSException("Invalid Message Factory Type " + resource.GetType().FullName);
- }
- }
-
- public static void Unregister(NMSResource<T> resource)
- {
- if(resource != null && resource.Id != null)
- {
- if(!resgistry.Remove(resource.Id))
- {
- if(resgistry.ContainsKey(resource.Id))
- Tracer.WarnFormat("MessageFactory was not able to unregister resource {0}.", resource.Id);
- }
- }
- }
-
- public static IMessageFactory Instance(Connection resource)
- {
- IMessageFactory factory = null;
- resgistry.TryGetValue(resource.Id, out factory);
- if(factory == null)
- {
- throw new NMSException("Resource "+resource+" is not registered as message factory.");
- }
- return factory;
- }
-
- protected readonly NMSResource<T> parent;
-
- protected MessageFactory(NMSResource<T> resource)
- {
- parent = resource;
- }
-
- public abstract MessageTransformation GetTransformFactory();
- public abstract IMessage CreateMessage();
- public abstract ITextMessage CreateTextMessage();
- public abstract ITextMessage CreateTextMessage(string text);
- public abstract IMapMessage CreateMapMessage();
- public abstract IObjectMessage CreateObjectMessage(object body);
- public abstract IBytesMessage CreateBytesMessage();
- public abstract IBytesMessage CreateBytesMessage(byte[] body);
- public abstract IStreamMessage CreateStreamMessage();
-
- }
-}
diff --git a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs b/src/NMS.AMQP/Message/INmsMessageFactory.cs
similarity index 59%
copy from src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
copy to src/NMS.AMQP/Message/INmsMessageFactory.cs
index 8775d1b..7e48d34 100644
--- a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
+++ b/src/NMS.AMQP/Message/INmsMessageFactory.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,26 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.IO;
-namespace Apache.NMS.AMQP.Message.Cloak
+using Apache.NMS.AMQP.Provider.Amqp.Message;
+
+namespace Apache.NMS.AMQP.Message
{
- interface IStreamMessageCloak : IMessageCloak
+ public interface INmsMessageFactory
{
-
-
- new IStreamMessageCloak Copy();
-
- bool HasNext { get; }
-
- void Reset();
-
- void Put(object value);
-
- object Peek();
-
- void Pop();
-
+ NmsMessage CreateMessage();
+ NmsTextMessage CreateTextMessage();
+ NmsTextMessage CreateTextMessage(string payload);
+ NmsStreamMessage CreateStreamMessage();
+ NmsBytesMessage CreateBytesMessage();
+ NmsBytesMessage CreateBytesMessage(byte[] body);
+ NmsMapMessage CreateMapMessage();
+ NmsObjectMessage CreateObjectMessage();
+ NmsObjectMessage CreateObjectMessage(object body);
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
similarity index 64%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/Message/InboundMessageDispatch.cs
index 118b5ab..e069a64 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
@@ -14,23 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message
{
- internal interface IBytesMessageCloak : IMessageCloak
+ public class InboundMessageDispatch
{
- BinaryReader getDataReader();
- BinaryWriter getDataWriter();
+ public Id ConsumerId { get; set; }
+ public ConsumerInfo ConsumerInfo { get; set; }
+ public NmsMessage Message { get; set; }
+ public bool IsDelivered { get; set; }
- new IBytesMessageCloak Copy();
-
- int BodyLength { get; }
- void Reset();
+ public int RedeliveryCount => Message?.Facade.RedeliveryCount ?? 0;
+ public bool EnqueueFirst { get; set; }
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/MapMessage.cs b/src/NMS.AMQP/Message/MapMessage.cs
deleted file mode 100644
index 8679e3a..0000000
--- a/src/NMS.AMQP/Message/MapMessage.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Message.Cloak;
-
-namespace Apache.NMS.AMQP.Message
-{
- /// <summary>
- /// Apache.NMS.AMQP.Message.MapMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IMapMessage interface.
- /// Apache.NMS.AMQP.Message.MapMessage uses the Apache.NMS.AMQP.Message.Cloak.IMapMessageCloak interface to detach from the underlying AMQP 1.0 engine.
- /// </summary>
- class MapMessage : Message, IMapMessage
- {
- new private readonly IMapMessageCloak cloak;
- private PrimitiveMapInterceptor map;
-
- internal MapMessage(IMapMessageCloak message) : base(message)
- {
- cloak = message;
- }
-
- public override bool IsReadOnly
- {
- get { return base.IsReadOnly; }
- internal set
- {
- if (map != null)
- {
- map.ReadOnly = value;
- }
- base.IsReadOnly = value;
- }
- }
-
- public IPrimitiveMap Body
- {
- get
- {
- if(map == null)
- {
- map = new PrimitiveMapInterceptor(this, cloak.Map, IsReadOnly, true);
- }
- return map;
- }
- }
-
- internal override Message Copy()
- {
- return new MapMessage(cloak.Copy());
- }
- }
-}
diff --git a/src/NMS.AMQP/Message/Message.cs b/src/NMS.AMQP/Message/Message.cs
deleted file mode 100644
index b2ba218..0000000
--- a/src/NMS.AMQP/Message/Message.cs
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Message
-{
-
- using Cloak;
-
- internal enum AckType
- {
- ACCEPTED = 0,
- REJECTED = 1,
- RELEASED = 2,
- MODIFIED_FAILED = 3,
- MODIFIED_FAILED_UNDELIVERABLE = 4,
- }
-
- /// <summary>
- /// Apache.NMS.AMQP.Message.Message is the root message class that implements the Apache.NMS.IMessage interface.
- /// Apache.NMS.AMQP.Message.Message uses the Apache.NMS.AMQP.Message.Cloak.IMessageCloak interface to detach from the underlying AMQP 1.0 engine.
- /// </summary>
- class Message : IMessage
- {
-
- public static readonly string MESSAGE_VENDOR_ACK_PROP = PropertyUtil.CreateProperty("ACK.TYPE", "AMQP");
-
- protected readonly IMessageCloak cloak;
- private IPrimitiveMap propertyHelper = null;
-
- #region Constructors
-
- internal Message(IMessageCloak message)
- {
- this.cloak = message;
- }
-
- #endregion
-
- #region Protected Methods
-
- protected void FailIfReadOnlyMsgBody()
- {
- if(IsReadOnly == true)
- {
- throw new MessageNotWriteableException("Message is in Read-Only mode.");
- }
- }
-
- protected void FailIfWriteOnlyMsgBody()
- {
- if (IsReadOnly == false)
- {
- throw new MessageNotReadableException("Message is in Write-Only mode.");
- }
- }
-
- #endregion
-
- #region Public Properties
-
- public virtual byte[] Content
- {
- get
- {
- return cloak.Content;
- }
-
- set
- {
- cloak.Content = value;
- }
- }
-
- public virtual bool IsReadOnly
- {
- get { return cloak.IsBodyReadOnly; }
- internal set { cloak.IsBodyReadOnly = value; }
- }
-
- public virtual bool IsReadOnlyProperties
- {
- get { return cloak.IsPropertiesReadOnly; }
- internal set { cloak.IsPropertiesReadOnly = value; }
- }
-
- #endregion
-
- #region IMessage Properties
-
-
- public string NMSCorrelationID
- {
- get { return cloak.NMSCorrelationID; }
- set { cloak.NMSCorrelationID = value; }
- }
-
- public MsgDeliveryMode NMSDeliveryMode
- {
- get { return cloak.NMSDeliveryMode; }
- set { cloak.NMSDeliveryMode = value; }
- }
-
- public IDestination NMSDestination
- {
- get { return cloak.NMSDestination; }
- set { cloak.NMSDestination = value; }
- }
-
- public string NMSMessageId
- {
- get { return cloak.NMSMessageId; }
- set { cloak.NMSMessageId = value; }
- }
-
- public MsgPriority NMSPriority
- {
- get { return cloak.NMSPriority; }
- set { cloak.NMSPriority = value; }
- }
-
- public bool NMSRedelivered
- {
- get { return cloak.NMSRedelivered; }
- set { cloak.NMSRedelivered = value; }
- }
-
- public IDestination NMSReplyTo
- {
- get { return cloak.NMSReplyTo; }
- set { cloak.NMSReplyTo = value; }
- }
-
- public DateTime NMSTimestamp
- {
- get { return cloak.NMSTimestamp; }
- set { cloak.NMSTimestamp = value; }
- }
-
- public TimeSpan NMSTimeToLive
- {
- get { return cloak.NMSTimeToLive; }
- set { cloak.NMSTimeToLive = value; }
- }
-
- public string NMSType
- {
- get { return cloak.NMSType; }
- set { cloak.NMSType = value; }
- }
-
- public IPrimitiveMap Properties
- {
- get
- {
- if(propertyHelper == null)
- {
- propertyHelper = new NMSMessagePropertyInterceptor(this, cloak.Properties);
- }
- return propertyHelper;
- }
- }
-
- #endregion
-
- #region IMessage Methods
-
- public virtual void Acknowledge()
- {
- cloak.Acknowledge();
- }
-
- public virtual void ClearBody()
- {
- cloak.ClearBody();
- }
-
- public void ClearProperties()
- {
- propertyHelper.Clear();
- }
-
- #endregion
-
- #region Internal Methods
-
- internal IMessageCloak GetMessageCloak()
- {
- return cloak;
- }
-
- internal virtual Message Copy()
- {
- return new Message(this.cloak.Copy());
- }
-
- protected virtual void CopyInto(Message other)
- {
-
- }
-
- #endregion
-
- public override string ToString()
- {
- return base.ToString() + ":\n Impl Type: " + cloak.ToString();
- }
-
- }
-
- internal class MessageAcknowledgementHandler
- {
-
- private MessageConsumer consumer;
- private Session session;
- private Message message;
-
- private AckType? atype = null;
-
- public MessageAcknowledgementHandler(MessageConsumer mc, Message msg)
- {
- consumer = mc;
- session = consumer.Session;
- message = msg;
-
- }
-
- public AckType AcknowledgementType
- {
- get
- {
- return atype ?? MessageSupport.DEFAULT_ACK_TYPE;
- }
- set
- {
- atype = value;
- }
- }
-
- public void ClearAckType()
- {
- atype = null;
- }
-
- public bool IsAckTypeSet
- {
- get => atype != null;
- }
-
- public void Acknowledge()
- {
-
- if (session.AcknowledgementMode.Equals(AcknowledgementMode.IndividualAcknowledge))
- {
- consumer.AcknowledgeMessage(message, AcknowledgementType);
- }
- else // Session Ackmode AcknowledgementMode.ClientAcknowledge
- {
- session.Acknowledge(AcknowledgementType);
- }
- }
-
- }
-
-}
diff --git a/src/NMS.AMQP/Message/BytesMessage.cs b/src/NMS.AMQP/Message/NmsBytesMessage.cs
similarity index 52%
rename from src/NMS.AMQP/Message/BytesMessage.cs
rename to src/NMS.AMQP/Message/NmsBytesMessage.cs
index 8186214..62eaa5b 100644
--- a/src/NMS.AMQP/Message/BytesMessage.cs
+++ b/src/NMS.AMQP/Message/NmsBytesMessage.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,136 +14,69 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
using System.IO;
-using Apache.NMS;
+using Apache.NMS.AMQP.Message.Facade;
using Apache.NMS.Util;
namespace Apache.NMS.AMQP.Message
{
- using Cloak;
- /// <summary>
- /// Apache.NMS.AMQP.Message.BytesMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IBytesMessage interface.
- /// Apache.NMS.AMQP.Message.BytesMessage uses the Apache.NMS.AMQP.Message.Cloak.IBytesMessageCloak interface to detach from the underlying AMQP 1.0 engine.
- /// </summary>
- class BytesMessage : Message, IBytesMessage
+ public class NmsBytesMessage : NmsMessage, IBytesMessage
{
private BinaryWriter dataOut = null;
private BinaryReader dataIn = null;
- private MemoryStream outputBuffer = null;
- private readonly new IBytesMessageCloak cloak;
-
- #region Constructor
-
- internal BytesMessage(IBytesMessageCloak message) : base(message)
- {
- cloak = message;
- }
-
- #endregion
-
- internal override Message Copy()
- {
- return new BytesMessage(this.cloak.Copy());
- }
+ private readonly INmsBytesMessageFacade facade;
- #region Private Methods
-
- private void InitializeReadingMode()
+ public NmsBytesMessage(INmsBytesMessageFacade facade) : base(facade)
{
- FailIfWriteOnlyMsgBody();
- if(dataIn == null || dataIn.BaseStream == null)
- {
- dataIn = cloak.getDataReader();
- }
+ this.facade = facade;
}
- private void InitializeWritingMode()
+ public byte[] Content
{
- FailIfReadOnlyMsgBody();
- if(dataOut == null )
+ get
{
- dataOut = cloak.getDataWriter();
+ byte[] buffer = new byte [BodyLength];
+ ReadBytes(buffer);
+ return buffer;
}
+ set => WriteBytes(value);
}
- private void StoreContent()
+ public byte ReadByte()
{
- if(dataOut != null)
+ InitializeReading();
+ try
{
- dataOut.Close();
- base.Content = outputBuffer.ToArray();
-
- dataOut = null;
- outputBuffer = null;
+ return dataIn.ReadByte();
}
- }
-
- #endregion
-
- #region IBytesMessage Properties
-
- public override byte[] Content
- {
- get
+ catch (EndOfStreamException e)
{
- byte[] buffer = null;
- InitializeReadingMode();
- if(this.cloak.BodyLength != 0)
- {
- buffer = new byte[this.cloak.BodyLength];
- dataIn.Read(buffer, 0, buffer.Length);
- }
- return buffer;
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
}
- set
+ catch (IOException e)
{
- InitializeWritingMode();
- if(value != null)
- {
- this.dataOut.Write(value, 0, value.Length);
- }
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public long BodyLength
+ public void WriteByte(byte value)
{
- get
+ InitializeWriting();
+ try
{
- InitializeReadingMode();
- return this.cloak.BodyLength;
+ this.dataOut.Write(value);
+ }
+ catch (IOException e)
+ {
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
- }
-
- #endregion
-
- #region IBytesMessage Methods
-
- public override void ClearBody()
- {
- dataIn = null;
- dataOut = null;
- outputBuffer = null;
- IsReadOnly = false;
- base.ClearBody();
- }
-
- public void Reset()
- {
- dataIn = null;
- dataOut = null;
- outputBuffer = null;
- this.cloak.Reset();
- IsReadOnly = true;
}
public bool ReadBoolean()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
return dataIn.ReadBoolean();
@@ -158,29 +91,25 @@ namespace Apache.NMS.AMQP.Message
}
}
- public byte ReadByte()
+ public void WriteBoolean(bool value)
{
- InitializeReadingMode();
+ InitializeWriting();
try
{
- return dataIn.ReadByte();
- }
- catch (EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
+ this.dataOut.Write(value);
}
catch (IOException e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public int ReadBytes(byte[] value)
+ public char ReadChar()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
- return dataIn.Read(value, 0, value.Length);
+ return dataIn.ReadChar();
}
catch (EndOfStreamException e)
{
@@ -192,29 +121,25 @@ namespace Apache.NMS.AMQP.Message
}
}
- public int ReadBytes(byte[] value, int length)
+ public void WriteChar(char value)
{
- InitializeReadingMode();
+ InitializeWriting();
try
{
- return dataIn.Read(value, 0, length);
- }
- catch (EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
+ this.dataOut.Write(value);
}
catch (IOException e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public char ReadChar()
+ public short ReadInt16()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
- return dataIn.ReadChar();
+ return dataIn.ReadInt16();
}
catch (EndOfStreamException e)
{
@@ -226,29 +151,25 @@ namespace Apache.NMS.AMQP.Message
}
}
- public double ReadDouble()
+ public void WriteInt16(short value)
{
- InitializeReadingMode();
+ InitializeWriting();
try
{
- return dataIn.ReadDouble();
- }
- catch (EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
+ this.dataOut.Write(value);
}
catch (IOException e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public short ReadInt16()
+ public int ReadInt32()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
- return dataIn.ReadInt16();
+ return dataIn.ReadInt32();
}
catch (EndOfStreamException e)
{
@@ -260,26 +181,22 @@ namespace Apache.NMS.AMQP.Message
}
}
- public int ReadInt32()
+ public void WriteInt32(int value)
{
- InitializeReadingMode();
+ InitializeWriting();
try
{
- return dataIn.ReadInt32();
- }
- catch (EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
+ this.dataOut.Write(value);
}
catch (IOException e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public long ReadInt64()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
return dataIn.ReadInt64();
@@ -294,30 +211,25 @@ namespace Apache.NMS.AMQP.Message
}
}
- public float ReadSingle()
+ public void WriteInt64(long value)
{
- InitializeReadingMode();
+ InitializeWriting();
try
{
- return dataIn.ReadSingle();
- }
- catch (EndOfStreamException e)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(e);
+ this.dataOut.Write(value);
}
catch (IOException e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public string ReadString()
+ public float ReadSingle()
{
- InitializeReadingMode();
+ InitializeReading();
try
{
- // Note if dataIn is an EndianBinaryReader the string length is read as 16bit short
- return dataIn.ReadString();
+ return dataIn.ReadSingle();
}
catch (EndOfStreamException e)
{
@@ -328,92 +240,90 @@ namespace Apache.NMS.AMQP.Message
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
-
- public void WriteBoolean(bool value)
+
+ public void WriteSingle(float value)
{
- InitializeWritingMode();
+ InitializeWriting();
try
{
- dataOut.Write(value);
+ this.dataOut.Write(value);
}
- catch (Exception e)
+ catch (IOException e)
{
- throw NMSExceptionSupport.Create(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public void WriteByte(byte value)
+ public double ReadDouble()
{
- InitializeWritingMode();
+ InitializeReading();
try
{
- dataOut.Write(value);
+ return dataIn.ReadDouble();
}
- catch(Exception e)
+ catch (EndOfStreamException e)
{
- throw NMSExceptionSupport.Create(e);
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
}
-
}
- public void WriteBytes(byte[] value)
+ public void WriteDouble(double value)
{
- InitializeWritingMode();
+ InitializeWriting();
try
{
- dataOut.Write(value, 0, value.Length);
+ this.dataOut.Write(value);
}
- catch (Exception e)
+ catch (IOException e)
{
- throw NMSExceptionSupport.Create(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public void WriteBytes(byte[] value, int offset, int length)
+ public int ReadBytes(byte[] value)
{
- InitializeWritingMode();
- try
- {
- dataOut.Write(value, offset, length);
- }
- catch (Exception e)
- {
- throw NMSExceptionSupport.Create(e);
- }
+ return ReadBytes(value, value.Length);
}
- public void WriteChar(char value)
+ public int ReadBytes(byte[] value, int length)
{
- InitializeWritingMode();
- try
+ InitializeReading();
+
+ if (length < 0 || value.Length < length)
{
- dataOut.Write(value);
+ throw new IndexOutOfRangeException("length must not be negative or larger than the size of the provided array");
}
- catch (Exception e)
- {
- throw NMSExceptionSupport.Create(e);
- }
- }
- public void WriteDouble(double value)
- {
- InitializeWritingMode();
try
{
- dataOut.Write(value);
+ return dataIn.Read(value, 0, length);
}
- catch (Exception e)
+ catch (EndOfStreamException e)
{
- throw NMSExceptionSupport.Create(e);
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public void WriteInt16(short value)
+ public void WriteBytes(byte[] value)
+ {
+ WriteBytes(value, 0, value.Length);
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
{
- InitializeWritingMode();
+ InitializeWriting();
+
try
{
- dataOut.Write(value);
+ dataOut.Write(value, offset, length);
}
catch (Exception e)
{
@@ -421,118 +331,123 @@ namespace Apache.NMS.AMQP.Message
}
}
- public void WriteInt32(int value)
+ public string ReadString()
{
- InitializeWritingMode();
+ InitializeReading();
try
{
- dataOut.Write(value);
+ return dataIn.ReadString();
}
- catch (Exception e)
+ catch (EndOfStreamException e)
{
- throw NMSExceptionSupport.Create(e);
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
- public void WriteInt64(long value)
+ public void WriteString(string value)
{
- InitializeWritingMode();
+ InitializeWriting();
try
{
- dataOut.Write(value);
+ this.dataOut.Write(value);
}
- catch (Exception e)
+ catch (IOException e)
{
- throw NMSExceptionSupport.Create(e);
+ NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public void WriteObject(object value)
{
- InitializeWritingMode();
-
- Type objType = value.GetType();
- if(value is byte[])
- {
- dataOut.Write((byte[])value);
- }
- else if (objType.IsPrimitive)
- {
- if(value is Byte)
- {
- dataOut.Write((byte)value);
- }
- else if (value is Char)
- {
- dataOut.Write((char)value);
- }
- else if (value is Boolean)
- {
- dataOut.Write((bool)value);
- }
- else if (value is Int16)
- {
- dataOut.Write((short)value);
- }
- else if (value is Int32)
- {
- dataOut.Write((int)value);
- }
- else if (value is Int64)
- {
- dataOut.Write((long)value);
- }
- else if (value is Single)
- {
- dataOut.Write((float)value);
- }
- else if (value is Double)
- {
- dataOut.Write((double)value);
- }
- else if (value is String)
- {
- dataOut.Write((string) value);
- }
- else
- {
- throw new MessageFormatException("Cannot write primitive type:" + objType);
- }
- }
+ if (value == null)
+ throw new ArgumentNullException(nameof(value));
+
+ if (value is byte byteValue)
+ WriteByte(byteValue);
+ else if (value is char charValue)
+ WriteChar(charValue);
+ else if (value is bool boolValue)
+ WriteBoolean(boolValue);
+ else if (value is short shortValue)
+ WriteInt16(shortValue);
+ else if (value is int intValue)
+ WriteInt32(intValue);
+ else if (value is long longValue)
+ WriteInt64(longValue);
+ else if (value is float floatValue)
+ WriteSingle(floatValue);
+ else if (value is double doubleValue)
+ WriteDouble(doubleValue);
+ else if (value is string stringValue)
+ WriteString(stringValue);
+ else if (value is byte[] bytes)
+ WriteBytes(bytes);
else
- {
- throw new MessageFormatException("Cannot write non-primitive type:" + objType);
- }
-
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType().FullName);
}
- public void WriteSingle(float value)
+ public override void ClearBody()
{
- InitializeWritingMode();
- try
- {
- dataOut.Write(value);
- }
- catch (Exception e)
+ base.ClearBody();
+ this.dataIn = null;
+ this.dataOut = null;
+ }
+
+ public override void OnSend(TimeSpan producerTtl)
+ {
+ Reset();
+ base.OnSend(producerTtl);
+ }
+
+ public void Reset()
+ {
+ this.facade.Reset();
+ this.dataOut = null;
+ this.dataIn = null;
+ IsReadOnlyBody = true;
+ }
+
+ public long BodyLength
+ {
+ get
{
- throw NMSExceptionSupport.Create(e);
+ InitializeReading();
+ return facade.BodyLength;
}
}
- public void WriteString(string value)
+ public override string ToString()
{
- InitializeWritingMode();
- try
+ return $"NmsBytesMessage {{ {Facade} }}";
+ }
+
+ private void InitializeWriting()
+ {
+ CheckReadOnlyBody();
+ if (dataOut == null)
{
- // note if dataOut is an EndianBinaryWriter, strings are written with a 16bit short length.
- dataOut.Write(value);
+ dataOut = facade.GetDataWriter();
}
- catch (Exception e)
+ }
+
+ private void InitializeReading()
+ {
+ CheckWriteOnlyBody();
+ if (dataIn?.BaseStream == null)
{
- throw NMSExceptionSupport.Create(e);
+ dataIn = facade.GetDataReader();
}
}
- #endregion
+ public override NmsMessage Copy()
+ {
+ NmsBytesMessage copy = new NmsBytesMessage(facade.Copy() as INmsBytesMessageFacade);
+ CopyInto(copy);
+ return copy;
+ }
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Util/AtomicSequence.cs b/src/NMS.AMQP/Message/NmsMapMessage.cs
similarity index 52%
copy from src/NMS.AMQP/Util/AtomicSequence.cs
copy to src/NMS.AMQP/Message/NmsMapMessage.cs
index fd59a94..0d74176 100644
--- a/src/NMS.AMQP/Util/AtomicSequence.cs
+++ b/src/NMS.AMQP/Message/NmsMapMessage.cs
@@ -14,42 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+
+using Apache.NMS.AMQP.Message.Facade;
using Apache.NMS.Util;
-namespace Apache.NMS.AMQP.Util
+namespace Apache.NMS.AMQP.Message
{
- /// <summary>
- /// Simple utility class used mainly for Id generation.
- /// </summary>
- class AtomicSequence : Atomic<ulong>
+ public class NmsMapMessage : NmsMessage, IMapMessage
{
- public AtomicSequence() : base()
- {
- }
+ private readonly INmsMapMessageFacade facade;
+ private PrimitiveMapInterceptor map;
- public AtomicSequence(ulong defaultValue) : base(defaultValue)
+ public NmsMapMessage(INmsMapMessageFacade facade) : base(facade)
{
+ this.facade = facade;
}
- public ulong getAndIncrement()
+ public override bool IsReadOnly
{
- ulong val = 0;
- lock (this)
+ get => base.IsReadOnly;
+ set
{
- val = atomicValue;
- atomicValue++;
+ if (map != null)
+ map.ReadOnly = value;
+
+ base.IsReadOnly = value;
}
- return val;
}
+ public IPrimitiveMap Body => map ?? (map = new PrimitiveMapInterceptor(this, facade.Map, IsReadOnly, true));
+
public override string ToString()
{
- return Value.ToString();
+ return $"NmsMapMessage {{ {Facade} }}";
+ }
+
+ public override NmsMessage Copy()
+ {
+ NmsMapMessage copy = new NmsMapMessage(facade.Copy() as INmsMapMessageFacade);
+ CopyInto(copy);
+ return copy;
}
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessage.cs b/src/NMS.AMQP/Message/NmsMessage.cs
new file mode 100644
index 0000000..1f81717
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsMessage.cs
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Message.Facade;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+ public class NmsMessage : IMessage
+ {
+ private MessagePropertyIntercepter properties;
+ private bool readOnlyProperties;
+
+ public NmsMessage(INmsMessageFacade facade)
+ {
+ Facade = facade;
+ }
+
+ public INmsMessageFacade Facade { get; }
+
+ public IPrimitiveMap Properties => properties ?? (properties = new MessagePropertyIntercepter(this, Facade.Properties, IsReadOnlyProperties));
+
+ public string NMSCorrelationID
+ {
+ get => Facade.NMSCorrelationID;
+ set => Facade.NMSCorrelationID = value;
+ }
+
+ public IDestination NMSDestination
+ {
+ get => Facade.NMSDestination;
+ set => Facade.NMSDestination = value;
+ }
+
+ public TimeSpan NMSTimeToLive
+ {
+ get => Facade.NMSTimeToLive;
+ set => Facade.NMSTimeToLive = value;
+ }
+
+ public string NMSMessageId
+ {
+ get => Facade.NMSMessageId;
+ set => Facade.NMSMessageId = value;
+ }
+
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get => Facade.NMSDeliveryMode;
+ set => Facade.NMSDeliveryMode = value;
+ }
+
+ public MsgPriority NMSPriority
+ {
+ get => Facade.NMSPriority;
+ set => Facade.NMSPriority = value;
+ }
+
+ public bool NMSRedelivered
+ {
+ get => Facade.NMSRedelivered;
+ set => Facade.NMSRedelivered = value;
+ }
+
+ public IDestination NMSReplyTo
+ {
+ get => Facade.NMSReplyTo;
+ set => Facade.NMSReplyTo = value;
+ }
+
+ public DateTime NMSTimestamp
+ {
+ get => Facade.NMSTimestamp;
+ set => Facade.NMSTimestamp = value;
+ }
+
+ public string NMSType
+ {
+ get => Facade.NMSType;
+ set => Facade.NMSType = value;
+ }
+
+ public NmsAcknowledgeCallback NmsAcknowledgeCallback { get; set; }
+
+ public virtual bool IsReadOnly { get; set; }
+
+ public bool IsReadOnlyBody { get; set; }
+
+ public bool IsReadOnlyProperties
+ {
+ get => readOnlyProperties;
+ set
+ {
+ readOnlyProperties = value;
+ if (properties != null)
+ {
+ properties.ReadOnly = value;
+ }
+ }
+ }
+
+ public void Acknowledge()
+ {
+ if (NmsAcknowledgeCallback != null)
+ {
+ try
+ {
+ NmsAcknowledgeCallback.Acknowledge();
+ NmsAcknowledgeCallback = null;
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+ }
+
+ public virtual void ClearBody()
+ {
+ CheckReadOnly();
+ IsReadOnlyBody = false;
+ Facade.ClearBody();
+ }
+
+ public void ClearProperties()
+ {
+ CheckReadOnly();
+ IsReadOnlyProperties = false;
+ Properties.Clear();
+ }
+
+ public virtual void OnSend(TimeSpan producerTtl)
+ {
+ IsReadOnly = true;
+ Facade.OnSend(producerTtl);
+ }
+
+ public void OnDispatch()
+ {
+ IsReadOnly = false;
+ IsReadOnlyBody = true;
+ IsReadOnlyProperties = true;
+ }
+
+ public override string ToString()
+ {
+ return $"NmsMessage {{ {Facade} }}";
+ }
+
+ protected bool Equals(NmsMessage other)
+ {
+ if (other.NMSMessageId == null)
+ return false;
+
+ return string.Equals(NMSMessageId, other.NMSMessageId);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsMessage) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (NMSMessageId != null ? NMSMessageId.GetHashCode() : base.GetHashCode());
+ }
+
+ //----- State validation methods -----------------------------------------//
+
+ protected void CheckReadOnly()
+ {
+ if (IsReadOnly)
+ {
+ throw new MessageNotReadableException("Message is currently read-only");
+ }
+ }
+
+ protected void CheckWriteOnlyBody()
+ {
+ if (!IsReadOnlyBody)
+ {
+ throw new MessageNotReadableException("Message body is write-only");
+ }
+ }
+
+ protected void CheckReadOnlyBody()
+ {
+ if (IsReadOnly || IsReadOnlyBody)
+ {
+ throw new MessageNotWriteableException("Message body is read-only");
+ }
+ }
+
+ public bool IsExpired()
+ {
+ DateTime expireTime = Facade.Expiration;
+ return expireTime > DateTime.MinValue && DateTime.UtcNow > expireTime;
+ }
+
+ public virtual NmsMessage Copy()
+ {
+ NmsMessage copy = new NmsMessage(Facade.Copy());
+ CopyInto(copy);
+ return copy;
+ }
+
+ protected void CopyInto(NmsMessage target)
+ {
+ target.IsReadOnly = IsReadOnly;
+ target.IsReadOnlyBody = IsReadOnlyBody;
+ target.IsReadOnlyProperties = IsReadOnlyProperties;
+ target.NmsAcknowledgeCallback = NmsAcknowledgeCallback;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessageTransformation.cs b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
new file mode 100644
index 0000000..ef1acb1
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections;
+
+namespace Apache.NMS.AMQP.Message
+{
+ public static class NmsMessageTransformation
+ {
+ public static NmsMessage TransformMessage(INmsMessageFactory factory, IMessage message)
+ {
+ NmsMessage nmsMessage = null;
+
+ if (message is IBytesMessage bytesMessage)
+ {
+ bytesMessage.Reset();
+ NmsBytesMessage msg = factory.CreateBytesMessage();
+
+ try
+ {
+ while (true)
+ {
+ // Reads a byte from the message stream until the stream is empty
+ msg.WriteByte(bytesMessage.ReadByte());
+ }
+ }
+ catch (MessageEOFException)
+ {
+ // Indicates all the bytes have been read from the source.
+ }
+
+ nmsMessage = msg;
+ }
+ else if (message is IMapMessage mapMessage)
+ {
+ NmsMapMessage msg = factory.CreateMapMessage();
+ CopyMap(mapMessage.Body, msg.Body);
+ nmsMessage = msg;
+ }
+ else if (message is IObjectMessage objectMessage)
+ {
+ NmsObjectMessage msg = factory.CreateObjectMessage();
+ msg.Body = objectMessage.Body;
+ nmsMessage = msg;
+ }
+ else if (message is IStreamMessage streamMessage)
+ {
+ streamMessage.Reset();
+ NmsStreamMessage msg = factory.CreateStreamMessage();
+
+ try
+ {
+ while (true)
+ {
+ // Reads a byte from the message stream until the stream is empty
+ msg.WriteObject(streamMessage.ReadObject());
+ }
+ }
+ catch (MessageEOFException)
+ {
+ // Indicates all the stream have been read from the source.
+ }
+
+ nmsMessage = msg;
+ }
+ else if (message is ITextMessage textMessage)
+ {
+ NmsTextMessage msg = factory.CreateTextMessage();
+ msg.Text = textMessage.Text;
+ nmsMessage = msg;
+ }
+ else
+ nmsMessage = factory.CreateMessage();
+
+
+ CopyProperties(message, nmsMessage);
+ return nmsMessage;
+ }
+
+ private static void CopyProperties(IMessage source, NmsMessage target)
+ {
+ target.NMSMessageId = source.NMSMessageId;
+ target.NMSCorrelationID = source.NMSCorrelationID;
+ target.NMSDestination = source.NMSDestination;
+ target.NMSReplyTo = source.NMSReplyTo;
+ target.NMSDeliveryMode = source.NMSDeliveryMode;
+ target.NMSRedelivered = source.NMSRedelivered;
+ target.NMSType = source.NMSType;
+ target.NMSPriority = source.NMSPriority;
+ target.NMSTimestamp = source.NMSTimestamp;
+
+ CopyMap(source.Properties, target.Properties);
+ }
+
+ private static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
+ {
+ foreach (object key in source.Keys)
+ {
+ string name = key.ToString();
+ object value = source[name];
+
+ switch (value)
+ {
+ case bool boolValue:
+ target.SetBool(name, boolValue);
+ break;
+ case char charValue:
+ target.SetChar(name, charValue);
+ break;
+ case string stringValue:
+ target.SetString(name, stringValue);
+ break;
+ case byte byteValue:
+ target.SetByte(name, byteValue);
+ break;
+ case short shortValue:
+ target.SetShort(name, shortValue);
+ break;
+ case int intValue:
+ target.SetInt(name, intValue);
+ break;
+ case long longValue:
+ target.SetLong(name, longValue);
+ break;
+ case float floatValue:
+ target.SetFloat(name, floatValue);
+ break;
+ case double doubleValue:
+ target.SetDouble(name, doubleValue);
+ break;
+ case byte[] bytesValue:
+ target.SetBytes(name, bytesValue);
+ break;
+ case IList listValue:
+ target.SetList(name, listValue);
+ break;
+ case IDictionary dictionaryValue:
+ target.SetDictionary(name, dictionaryValue);
+ break;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/ObjectMessage.cs b/src/NMS.AMQP/Message/NmsObjectMessage.cs
similarity index 55%
copy from src/NMS.AMQP/Message/ObjectMessage.cs
copy to src/NMS.AMQP/Message/NmsObjectMessage.cs
index d2a7054..556f9c2 100644
--- a/src/NMS.AMQP/Message/ObjectMessage.cs
+++ b/src/NMS.AMQP/Message/NmsObjectMessage.cs
@@ -14,60 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
-using System.Runtime.Serialization;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
+using Apache.NMS.AMQP.Message.Facade;
namespace Apache.NMS.AMQP.Message
{
- using Cloak;
- class ObjectMessage : Message, IObjectMessage
+ public class NmsObjectMessage : NmsMessage, IObjectMessage
{
- protected new readonly IObjectMessageCloak cloak;
- internal ObjectMessage(IObjectMessageCloak message) : base(message)
- {
- this.cloak = message;
- }
+ private readonly INmsObjectMessageFacade facade;
- public new byte[] Content
+ public NmsObjectMessage(INmsObjectMessageFacade facade) : base(facade)
{
- get
- {
- return cloak.Content;
- }
-
- set
- {
-
- }
+ this.facade = facade;
}
public object Body
{
- get
- {
- return cloak.Body;
- }
+ get => this.facade.Body;
set
{
-
+ CheckReadOnlyBody();
try
{
- cloak.Body = value;
+ this.facade.Body = value;
}
- catch (SerializationException se)
+ catch (Exception e)
{
- throw NMSExceptionSupport.CreateMessageFormatException(se);
+ throw new MessageFormatException("Failed to serialize object", e);
}
-
}
}
-
- internal override Message Copy()
+
+ public override string ToString()
+ {
+ return $"NmsObjectMessage {{ {Facade} }}";
+ }
+
+ public override NmsMessage Copy()
{
- return new ObjectMessage(this.cloak.Copy());
+ NmsObjectMessage copy = new NmsObjectMessage(facade.Copy() as INmsObjectMessageFacade);
+ CopyInto(copy);
+ return copy;
}
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsStreamMessage.cs b/src/NMS.AMQP/Message/NmsStreamMessage.cs
new file mode 100644
index 0000000..03d6150
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsStreamMessage.cs
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Globalization;
+using System.Runtime.InteropServices.ComTypes;
+using Apache.NMS.AMQP.Message.Facade;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+ public class NmsStreamMessage : NmsMessage, IStreamMessage
+ {
+ private const int NO_BYTES_IN_FLIGHT = -1;
+ private readonly INmsStreamMessageFacade facade;
+
+ private int remainingBytes = NO_BYTES_IN_FLIGHT;
+ private byte[] bytes;
+
+ public NmsStreamMessage(INmsStreamMessageFacade facade) : base(facade)
+ {
+ this.facade = facade;
+ }
+
+ public override string ToString()
+ {
+ return "NmsStreamMessage { " + facade + " }";
+ }
+
+ public bool ReadBoolean()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ bool result;
+ object value = facade.Peek();
+
+ if (value is bool boolResult)
+ result = boolResult;
+ else if (value is string stringValue)
+ result = stringValue.Equals(bool.TrueString);
+ else if (value is null)
+ result = false;
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to boolean.");
+
+ facade.Pop();
+ return result;
+ }
+
+ private void CheckBytesInFlight()
+ {
+ if (remainingBytes != NO_BYTES_IN_FLIGHT)
+ {
+ throw new MessageFormatException("Partially read byte[] entry still being retrieved using readBytes(byte[] dest)");
+ }
+ }
+
+ public byte ReadByte()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ byte result;
+ object value = facade.Peek();
+ if (value is byte byteValue)
+ result = byteValue;
+ else if (value is string stringValue && byte.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to byte.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to byte.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public int ReadBytes(byte[] value)
+ {
+ CheckWriteOnlyBody();
+
+ if (value == null)
+ {
+ throw new ArgumentNullException(nameof(value), "Target byte array was null");
+ }
+
+ if (remainingBytes == NO_BYTES_IN_FLIGHT)
+ {
+ object data = facade.Peek();
+ if (data == null)
+ {
+ return -1;
+ }
+ else if (!(data is byte[]))
+ {
+ throw new MessageFormatException("Next stream value is not a byte array.");
+ }
+ bytes = data as byte[];
+ remainingBytes = bytes.Length;
+ }
+ else if (remainingBytes == 0)
+ {
+ // We previously read all the bytes, but must have filled the destination array.
+ remainingBytes = NO_BYTES_IN_FLIGHT;
+ bytes = null;
+ facade.Pop();
+ return -1;
+ }
+
+ int previouslyRead = bytes.Length - remainingBytes;
+ int lengthToCopy = Math.Min(value.Length, remainingBytes);
+
+ if (lengthToCopy > 0)
+ {
+ Array.Copy(bytes, previouslyRead, value, 0, lengthToCopy);
+ }
+
+ remainingBytes -= lengthToCopy;
+
+ if (remainingBytes == 0 && lengthToCopy < value.Length)
+ {
+ // All bytes have been read and the destination array was not filled on this
+ // call, so the return will enable the caller to determine completion immediately.
+ remainingBytes = NO_BYTES_IN_FLIGHT;
+ bytes = null;
+ facade.Pop();
+ }
+
+ return lengthToCopy;
+ }
+
+ public char ReadChar()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ char result;
+ object value = facade.Peek();
+ if (value is char charValue)
+ result = charValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to char.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to char.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public short ReadInt16()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ short result;
+ object value = facade.Peek();
+ if (value is short shortValue)
+ result = shortValue;
+ else if (value is byte byteValue)
+ result = byteValue;
+ else if (value is string stringValue && short.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to short.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to short.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public int ReadInt32()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ int result;
+ object value = facade.Peek();
+ if (value is int intValue)
+ result = intValue;
+ else if (value is short shortValue)
+ result = shortValue;
+ else if (value is byte byteValue)
+ result = byteValue;
+ else if (value is string stringValue && int.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to int.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public long ReadInt64()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ long result;
+ object value = facade.Peek();
+ if (value is long longValue)
+ result = longValue;
+ else if (value is int intValue)
+ result = intValue;
+ else if (value is short shortValue)
+ result = shortValue;
+ else if (value is byte byteValue)
+ result = byteValue;
+ else if (value is string stringValue && long.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to long.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to long.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public float ReadSingle()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ float result;
+ object value = facade.Peek();
+ if (value is float floatValue)
+ result = floatValue;
+ else if (value is string stringValue && float.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to float.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to float.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public double ReadDouble()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ double result;
+ object value = facade.Peek();
+ if (value is double doubleValue)
+ result = doubleValue;
+ else if (value is float floatValue)
+ result = floatValue;
+ else if (value is string stringValue && float.TryParse(stringValue, out var parsedValue))
+ result = parsedValue;
+ else if (value is null)
+ throw new NullReferenceException("Cannot convert null value to long.");
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to double.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public string ReadString()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ string result;
+ object value = facade.Peek();
+ if (value == null)
+ result = null;
+ else if (value is string stringValue)
+ result = stringValue;
+ else if (value is float floatValue)
+ result = floatValue.ToString(CultureInfo.InvariantCulture);
+ else if (value is double doubleValue)
+ result = doubleValue.ToString(CultureInfo.InvariantCulture);
+ else if (value is long longValue)
+ result = longValue.ToString();
+ else if (value is int intValue)
+ result = intValue.ToString();
+ else if (value is short shortValue)
+ result = shortValue.ToString();
+ else if (value is byte byteValue)
+ result = byteValue.ToString();
+ else if (value is bool boolValue)
+ result = boolValue.ToString();
+ else if (value is char charValue)
+ result = charValue.ToString();
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public object ReadObject()
+ {
+ CheckWriteOnlyBody();
+ CheckBytesInFlight();
+
+ object result;
+ object value = facade.Peek();
+ if (value == null)
+ result = null;
+ else if (value is string)
+ result = value;
+ else if (value is float)
+ result = value;
+ else if (value is double)
+ result = value;
+ else if (value is long)
+ result = value;
+ else if (value is int)
+ result = value;
+ else if (value is short)
+ result = value;
+ else if (value is byte)
+ result = value;
+ else if (value is bool)
+ result = value;
+ else if (value is char)
+ result = value;
+ else if (value is byte[] original)
+ {
+ byte[] bytesResult = new byte[original.Length];
+ Array.Copy(original, 0, bytesResult, 0, original.Length);
+ result = bytesResult;
+ }
+ else
+ throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+ facade.Pop();
+ return result;
+ }
+
+ public void WriteBoolean(bool value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteByte(byte value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteBytes(byte[] value)
+ {
+ WriteBytes(value, 0, value.Length);
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
+ {
+ CheckReadOnlyBody();
+
+ byte[] entry = new byte[length];
+ Array.Copy(value, offset, entry, 0, length);
+ facade.Put(entry);
+ }
+
+ public void WriteChar(char value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteInt16(short value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteInt32(int value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteInt64(long value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteSingle(float value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteDouble(double value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteString(string value)
+ {
+ CheckReadOnlyBody();
+ facade.Put(value);
+ }
+
+ public void WriteObject(object value)
+ {
+ CheckReadOnlyBody();
+
+ switch (value)
+ {
+ case null:
+ case string _:
+ case char _:
+ case bool _:
+ case byte _:
+ case short _:
+ case int _:
+ case long _:
+ case float _:
+ case double _:
+ facade.Put(value);
+ break;
+ case byte[] bytesValue:
+ WriteBytes(bytesValue);
+ break;
+ default:
+ throw new MessageFormatException("Unsupported Object type: " + value.GetType().Name);
+ }
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ bytes = null;
+ remainingBytes = NO_BYTES_IN_FLIGHT;
+ }
+
+ public void Reset()
+ {
+ bytes = null;
+ remainingBytes = NO_BYTES_IN_FLIGHT;
+ IsReadOnlyBody = true;
+ facade.Reset();
+ }
+
+ public override NmsMessage Copy()
+ {
+ NmsStreamMessage copy = new NmsStreamMessage(facade.Copy() as INmsStreamMessageFacade);
+ CopyInto(copy);
+ return copy;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Util/AtomicSequence.cs b/src/NMS.AMQP/Message/NmsTextMessage.cs
similarity index 58%
copy from src/NMS.AMQP/Util/AtomicSequence.cs
copy to src/NMS.AMQP/Message/NmsTextMessage.cs
index fd59a94..9334a36 100644
--- a/src/NMS.AMQP/Util/AtomicSequence.cs
+++ b/src/NMS.AMQP/Message/NmsTextMessage.cs
@@ -14,42 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS.Util;
-namespace Apache.NMS.AMQP.Util
+using Apache.NMS.AMQP.Message.Facade;
+
+namespace Apache.NMS.AMQP.Message
{
- /// <summary>
- /// Simple utility class used mainly for Id generation.
- /// </summary>
- class AtomicSequence : Atomic<ulong>
+ public class NmsTextMessage : NmsMessage, ITextMessage
{
- public AtomicSequence() : base()
- {
- }
+ private readonly INmsTextMessageFacade facade;
- public AtomicSequence(ulong defaultValue) : base(defaultValue)
+ public NmsTextMessage(INmsTextMessageFacade facade) : base(facade)
{
+ this.facade = facade;
}
- public ulong getAndIncrement()
+ public string Text
{
- ulong val = 0;
- lock (this)
+ get => facade.Text;
+ set
{
- val = atomicValue;
- atomicValue++;
+ CheckReadOnlyBody();
+ facade.Text = value;
}
- return val;
}
public override string ToString()
{
- return Value.ToString();
+ return "NmsTextMessage { " + Text + " }";
+ }
+
+ public override NmsMessage Copy()
+ {
+ NmsTextMessage copy = new NmsTextMessage(facade.Copy() as INmsTextMessageFacade);
+ CopyInto(copy);
+ return copy;
}
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
similarity index 71%
copy from src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
copy to src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index acb649a..f69b83f 100644
--- a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -14,19 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-namespace Apache.NMS.AMQP.Message.Cloak
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
{
- interface IMapMessageCloak : IMessageCloak
+ public class OutboundMessageDispatch
{
- IPrimitiveMap Map { get; }
- new IMapMessageCloak Copy();
+ public Id ProducerId { get; set; }
+ public ProducerInfo ProducerInfo { get; set; }
+ public NmsMessage Message { get; set; }
+ public bool SendAsync { get; set; }
}
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/StreamMessage.cs b/src/NMS.AMQP/Message/StreamMessage.cs
deleted file mode 100644
index 3891df6..0000000
--- a/src/NMS.AMQP/Message/StreamMessage.cs
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
-
-namespace Apache.NMS.AMQP.Message
-{
- using Cloak;
- class StreamMessage : Message, IStreamMessage
- {
-
- private const int NO_BYTES_IN_BUFFER = -1;
-
- private readonly new IStreamMessageCloak cloak;
-
- private int RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
-
- private byte[] Buffer = null;
-
- internal StreamMessage(IStreamMessageCloak message) : base(message)
- {
- cloak = message;
- }
-
- #region IStreamMessage Methods
-
- public bool ReadBoolean()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- bool result;
- object value = cloak.Peek();
- if(value == null)
- {
- result = Convert.ToBoolean(value);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<bool>(value);
- }
- cloak.Pop();
- return result;
- }
-
- public byte ReadByte()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- byte result;
- object value = cloak.Peek();
- if(value == null)
- {
- result = Convert.ToByte(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<byte>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public int ReadBytes(byte[] value)
- {
- FailIfWriteOnlyMsgBody();
- if (value == null)
- {
- throw new NullReferenceException("Target byte array is null.");
- }
- if (RemainingBytesInBuffer == NO_BYTES_IN_BUFFER)
- {
- object data = cloak.Peek();
- if (data == null)
- {
- return -1;
- }
- else if (!(data is byte[]))
- {
- throw new MessageFormatException("Next stream value is not a byte array.");
- }
- Buffer = data as byte[];
- RemainingBytesInBuffer = Buffer.Length;
- }
- int bufferOffset = Buffer.Length - RemainingBytesInBuffer;
- int copyLength = Math.Min(value.Length, RemainingBytesInBuffer);
- if(copyLength > 0)
- Array.Copy(Buffer, bufferOffset, value, 0, copyLength);
- RemainingBytesInBuffer -= copyLength;
- if(RemainingBytesInBuffer == 0)
- {
- RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
- Buffer = null;
- cloak.Pop();
- }
- return copyLength;
- }
-
- public char ReadChar()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- char result;
- object value = cloak.Peek();
- if (value == null)
- {
- throw new NullReferenceException("Cannot convert NULL value to char.");
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<char>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public double ReadDouble()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- double result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToDouble(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<double>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public short ReadInt16()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- short result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToInt16(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<short>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public int ReadInt32()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- int result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToInt32(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<int>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public long ReadInt64()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- long result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToInt64(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<long>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public object ReadObject()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- object result = null;
- object value = null;
- try
- {
- value = cloak.Peek();
- if (value == null)
- {
- result = null;
- }
- else if (value is byte[])
- {
- byte[] buffer = value as byte[];
- result = new byte[buffer.Length];
- Array.Copy(buffer, 0, result as byte[], 0, buffer.Length);
- }
- else if (ConversionSupport.IsNMSType(value))
- {
- result = value;
- }
- }
- catch (EndOfStreamException eos)
- {
- throw NMSExceptionSupport.CreateMessageEOFException(eos);
- }
- catch (IOException ioe)
- {
- throw NMSExceptionSupport.CreateMessageFormatException(ioe);
- }
- catch (Exception e)
- {
- Tracer.InfoFormat("Unexpected exception caught reading Object stream. Exception = {0}", e);
- throw NMSExceptionSupport.Create("Unexpected exception caught reading Object stream.", e);
- }
- cloak.Pop();
- return result;
- }
-
- public float ReadSingle()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- float result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToSingle(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<float>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public string ReadString()
- {
- FailIfWriteOnlyMsgBody();
- FailIfBytesInBuffer();
- string result;
- object value = cloak.Peek();
- if (value == null)
- {
- result = Convert.ToString(null);
- }
- else
- {
- result = ConversionSupport.ConvertNMSType<string>(value);
- }
-
- cloak.Pop();
- return result;
- }
-
- public void Reset()
- {
- RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
- Buffer = null;
- IsReadOnly = true;
- cloak.Reset();
- }
-
- public override void ClearBody()
- {
- RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
- Buffer = null;
- IsReadOnly = false;
-
- base.ClearBody();
- }
-
- public void WriteBoolean(bool value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteByte(byte value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteBytes(byte[] value)
- {
- WriteBytes(value, 0, value.Length);
- }
-
- public void WriteBytes(byte[] value, int offset, int length)
- {
- FailIfReadOnlyMsgBody();
- byte[] entry = new byte[length];
- Array.Copy(value, offset, entry, 0, length);
- cloak.Put(entry);
- }
-
- public void WriteChar(char value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteDouble(double value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteInt16(short value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteInt32(int value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteInt64(long value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteObject(object value)
- {
- FailIfReadOnlyMsgBody();
- if(value == null)
- {
- cloak.Put(value);
- }
- else if(value is byte[])
- {
- WriteBytes(value as byte[]);
- }
- else if (ConversionSupport.IsNMSType(value))
- {
- cloak.Put(value);
- }
- else
- {
- throw NMSExceptionSupport.CreateMessageFormatException(new Exception("Unsupported Object type: " + value.GetType().Name));
- }
- }
-
- public void WriteSingle(float value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- public void WriteString(string value)
- {
- FailIfReadOnlyMsgBody();
- cloak.Put(value);
- }
-
- #endregion
-
- #region Validation Methods
-
- protected void FailIfBytesInBuffer()
- {
- if(RemainingBytesInBuffer != NO_BYTES_IN_BUFFER)
- {
- throw new MessageFormatException("Unfinished Buffered read for ReadBytes(byte[] value)");
- }
- }
-
- #endregion
-
- internal override Message Copy()
- {
- return new StreamMessage(this.cloak.Copy());
- }
- }
-}
diff --git a/src/NMS.AMQP/Message/TextMessage.cs b/src/NMS.AMQP/Message/TextMessage.cs
deleted file mode 100644
index e7a6169..0000000
--- a/src/NMS.AMQP/Message/TextMessage.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-
-namespace Apache.NMS.AMQP.Message
-{
- using Cloak;
- /// <summary>
- /// Apache.NMS.AMQP.Message.TextMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.ITextMessage interface.
- /// Apache.NMS.AMQP.Message.TextMessage uses the Apache.NMS.AMQP.Message.Cloak.ITextMessageCloak interface to detach from the underlying AMQP 1.0 engine.
- /// </summary>
- class TextMessage : Message, ITextMessage
- {
- protected readonly new ITextMessageCloak cloak;
-
- #region Constructor
-
- internal TextMessage(ITextMessageCloak cloak) : base(cloak)
- {
- this.cloak = cloak;
- }
-
- #endregion
-
- #region ITextMessage Properties
-
- public string Text
- {
- get
- {
- return cloak.Text;
- }
-
- set
- {
- FailIfReadOnlyMsgBody();
- cloak.Text = value;
- }
- }
-
- #endregion
-
- internal override Message Copy()
- {
- return new TextMessage(this.cloak.Copy());
- }
- }
-}
diff --git a/src/NMS.AMQP/MessageConsumer.cs b/src/NMS.AMQP/MessageConsumer.cs
deleted file mode 100644
index 2ee9312..0000000
--- a/src/NMS.AMQP/MessageConsumer.cs
+++ /dev/null
@@ -1,977 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Amqp;
-using Amqp.Framing;
-using Apache.NMS;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Util.Types.Queue;
-using Apache.NMS.AMQP.Message.Cloak;
-
-namespace Apache.NMS.AMQP
-{
- /// <summary>
- /// MessageConsumer Implement the <see cref="Apache.NMS.IMessageConsumer"/> interface.
- /// This class configures and manages an amqp receiver link.
- /// The Message consumer can be configured to receive message asynchronously or synchronously.
- /// </summary>
- class MessageConsumer : MessageLink, IMessageConsumer
- {
- // The Executor threshold limits the number of message listener dispatch events that can be on the session's executor at given time.
- private const int ExecutorThreshold = 2;
- private ConsumerInfo consumerInfo;
- private readonly string selector;
- private Apache.NMS.Util.Atomic<bool> hasStarted = new Apache.NMS.Util.Atomic<bool>(false);
- private MessageCallback OnInboundAMQPMessage;
- private IMessageQueue messageQueue;
- private LinkedList<IMessageDelivery> delivered;
- private System.Threading.ManualResetEvent MessageListenerInUseEvent = new System.Threading.ManualResetEvent(true);
- // pending Message delivery tasks counts the number of pending tasks on the Session's executor.
- // this should optimize the number of delivery task on the executor thread.
- private volatile int pendingMessageDeliveryTasks = 0;
- private volatile int MaxPendingTasks = 0;
-
- // stat counters useful for debuging
- // TODO create statistic container for counters maybe use ConsumerInfo?
- private int transportMsgCount = 0;
- private int messageDispatchCount = 0;
-
- #region Constructor
-
- internal MessageConsumer(Session ses, Destination dest) : this(ses, dest, null, null)
- {
- }
-
- internal MessageConsumer(Session ses, IDestination dest) : this(ses, dest, null, null)
- {
- }
-
- internal MessageConsumer(Session ses, IDestination dest, string name, string selector, bool noLocal = false) : base(ses, dest)
- {
- this.selector = selector;
- consumerInfo = new ConsumerInfo(ses.ConsumerIdGenerator.GenerateId());
- consumerInfo.SubscriptionName = name;
- consumerInfo.Selector = this.selector;
- consumerInfo.NoLocal = noLocal;
- Info = consumerInfo;
- messageQueue = new PriorityMessageQueue();
- delivered = new LinkedList<IMessageDelivery>();
- Configure();
- }
-
- #endregion
-
- #region Private Properties
-
- protected new IReceiverLink Link
- {
- get { return base.Link as IReceiverLink; }
-
- }
-
- // IsBrowser is a stub for an inherited Brower subclass
- protected virtual bool IsBrowser { get { return false; } }
-
- #endregion
-
- #region Internal Properties
-
- internal Id ConsumerId { get { return this.Info.Id; } }
-
- internal virtual bool IsDurable { get { return this.consumerInfo.SubscriptionName != null && this.consumerInfo.SubscriptionName.Length > 0; } }
-
- internal virtual bool HasSelector { get { return this.consumerInfo.Selector != null && this.consumerInfo.Selector.Length > 0; } }
-
- #endregion
-
- #region Private Methods
-
- private void AckReceived(IMessageDelivery delivery)
- {
- IMessageCloak cloak = delivery.Message.GetMessageCloak();
- if (cloak.AckHandler != null)
- {
- delivered.AddLast(delivery);
- }
- else
- {
- AckConsumed(delivery);
- }
- }
-
- private void AckConsumed(IMessageDelivery delivery)
- {
- Message.Message nmsMessage = delivery.Message;
- Tracer.DebugFormat("Consumer {0} Acking Accepted for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
- delivered.Remove(delivery);
- Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
- this.Link.Accept(amqpMessage);
- }
-
- private void AckReleased(IMessageDelivery delivery)
- {
- Message.Message nmsMessage = delivery.Message;
- Tracer.DebugFormat("Consumer {0} Acking Released for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
- Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
- this.Link.Release(amqpMessage);
- }
-
- private void AckRejected(IMessageDelivery delivery, NMSException ex)
- {
- Error err = new Error(NMSErrorCode.INTERNAL_ERROR);
- err.Description = ex.Message;
- AckRejected(delivery, err);
- }
-
- private void AckRejected(IMessageDelivery delivery, Error err = null)
- {
- Tracer.DebugFormat("Consumer {0} Acking Rejected for Message {1} with error {2} ", ConsumerId, delivery.Message.NMSMessageId, err?.ToString());
- Amqp.Message amqpMessage = (delivery.Message.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
- this.Link.Reject(amqpMessage, err);
- }
-
- private void AckModified(IMessageDelivery delivery, bool deliveryFailed, bool undeliverableHere = false)
- {
- Message.Message nmsMessage = delivery.Message;
- Tracer.DebugFormat("Consumer {0} Acking Modified for Message {1}{2}{3}.", ConsumerId, nmsMessage.NMSMessageId,
- deliveryFailed ? " Delivery Failed" : "",
- undeliverableHere ? " Undeliveryable Here" : "");
- Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
- //TODO use Link.Modified from amqpNetLite 2.0.0
- this.Link.Modify(amqpMessage, deliveryFailed, undeliverableHere, null);
- }
-
- private bool IsMessageRedeliveryExceeded(IMessageDelivery delivery)
- {
- Message.Message message = delivery.Message;
- IRedeliveryPolicy policy = this.Session.Connection.RedeliveryPolicy;
- if (policy != null && policy.MaximumRedeliveries >= 0)
- {
- IMessageCloak msgCloak = message.GetMessageCloak();
- return msgCloak.RedeliveryCount > policy.MaximumRedeliveries;
- }
- return false;
- }
-
- private bool IsMessageExpired(IMessageDelivery delivery)
- {
- Message.Message message = delivery.Message;
- if (message.NMSTimeToLive != TimeSpan.Zero)
- {
- DateTime now = DateTime.UtcNow;
- if (!IsBrowser && (message.NMSTimestamp + message.NMSTimeToLive) < now)
- {
- return true;
- }
- }
- return false;
- }
-
- #endregion
-
- #region Protected Methods
-
- protected void EnterMessageListenerEvent()
- {
- try
- {
- if(!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
- MessageListenerInUseEvent.Reset();
- }
- catch (Exception e)
- {
- Tracer.ErrorFormat("Failed to Reset MessageListener Event signal. Error : {0}", e);
- }
-
- }
-
- protected void LeaveMessageListenerEvent()
- {
- RemoveTaskRef();
- try
- {
- if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
- MessageListenerInUseEvent.Set();
- }
- catch (Exception e)
- {
- Tracer.ErrorFormat("Failed to Send MessageListener Event signal. Error : {0}", e);
- }
- }
-
- protected bool WaitOnMessageListenerEvent(int timeout = -1)
- {
- bool signaled = false;
- if (OnMessage != null)
- {
- if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
- {
- signaled = (timeout > -1) ? MessageListenerInUseEvent.WaitOne(timeout) : MessageListenerInUseEvent.WaitOne();
- }
- else if (!this.IsClosed)
- {
- Tracer.WarnFormat("Failed to wait for Message Listener Event on consumer {0}", Id);
- }
- }
-
- return signaled;
- }
-
- protected void AddTaskRef()
- {
- System.Threading.Interlocked.Increment(ref pendingMessageDeliveryTasks);
- int lastPending = MaxPendingTasks;
- MaxPendingTasks = Math.Max(pendingMessageDeliveryTasks, MaxPendingTasks);
- if (lastPending < MaxPendingTasks)
- {
- //Tracer.WarnFormat("Consumer {0} Distpatch event highwatermark increased to {1}.", Id, MaxPendingTasks);
- }
- }
-
- protected void RemoveTaskRef()
- {
- System.Threading.Interlocked.Decrement(ref pendingMessageDeliveryTasks);
- }
-
- protected void OnInboundMessage(IReceiverLink link, Amqp.Message message)
- {
- Message.Message msg = null;
- try
- {
- IMessage nmsMessage = Message.AMQP.AMQPMessageBuilder.CreateProviderMessage(this, message);
- msg = nmsMessage as Message.Message;
- if(
- Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
- Session.AcknowledgementMode.Equals(AcknowledgementMode.ClientAcknowledge)
- )
- {
- msg.GetMessageCloak().AckHandler = new Message.MessageAcknowledgementHandler(this, msg);
- }
- }
- catch (Exception e)
- {
- this.Session.OnException(e);
- }
-
- if(msg != null)
- {
- transportMsgCount++;
-
- SendForDelivery(new MessageDelivery(msg));
- }
- }
-
- protected void SendForDelivery(IMessageDelivery nmsDelivery)
- {
- this.messageQueue.Enqueue(nmsDelivery);
-
- if (this.OnMessage != null && pendingMessageDeliveryTasks < ExecutorThreshold)
- {
- AddTaskRef();
- DispatchEvent dispatchEvent = new MessageListenerDispatchEvent(this);
- Session.Dispatcher.Enqueue(dispatchEvent);
- }
- else if (pendingMessageDeliveryTasks < 0)
- {
- Tracer.ErrorFormat("Consumer {0} has invalid pending tasks count on executor {1}.", Id, pendingMessageDeliveryTasks);
- }
- }
-
- protected virtual void OnAttachResponse(ILink link, Attach attachResponse)
- {
- Tracer.InfoFormat("Received Performative Attach response on Link: {0}, Response: {1}", ConsumerId, attachResponse.ToString());
- OnResponse();
- if (link.Error != null)
- {
- this.Session.OnException(ExceptionSupport.GetException(link, "Consumer {0} Attach Failure.", this.ConsumerId));
- }
- }
-
- protected void SendFlow(int credit)
- {
- if (!mode.Value.Equals(Resource.Mode.Stopped))
- {
- this.Link.SetCredit(credit, false);
- }
- }
-
- protected virtual bool TryDequeue(out IMessageDelivery delivery, int timeout)
- {
- delivery = null;
- DateTime deadline = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeout);
- Tracer.DebugFormat("Waiting for msg availability Deadline {0}", deadline);
- try
- {
- while (true)
- {
- if(timeout == 0)
- {
- delivery = this.messageQueue.DequeueNoWait();
- }
- else
- {
- delivery = this.messageQueue.Dequeue(timeout);
- }
-
- if (delivery == null)
- {
- if (timeout == 0 || this.Link.IsClosed || this.messageQueue.IsClosed)
- {
- return false;
- }
- else if (timeout > 0)
- {
- timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
- }
- }
- else if (IsMessageExpired(delivery))
- {
- DateTime now = DateTime.UtcNow;
- Error err = new Error(NMSErrorCode.PROPERTY_ERROR);
- err.Description = "Message Expired";
- AckRejected(delivery, err);
- if (timeout > 0)
- {
- timeout = Math.Max((deadline - now).Milliseconds, 0);
- }
- if(Tracer.IsDebugEnabled)
- Tracer.DebugFormat("{0} Filtered expired (deadline {1} Now {2}) message: {3}", ConsumerId, deadline, now, delivery);
- }
- else if (IsMessageRedeliveryExceeded(delivery))
- {
- if (Tracer.IsDebugEnabled)
- Tracer.DebugFormat("{0} Filtered Message with excessive Redelivery Count: {1}", ConsumerId, delivery);
- AckModified(delivery, true, true);
- if (timeout > 0)
- {
- timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
- }
- }
- else
- {
- break;
- }
-
- }
- }
- catch(Exception e)
- {
- throw ExceptionSupport.Wrap(e, "Failed to Received message on consumer {0}.", ConsumerId);
- }
-
- return true;
- }
-
- protected void ThrowIfAsync()
- {
- if (this.OnMessage != null)
- {
- throw new IllegalStateException("Cannot synchronously receive message on a synchronous consumer " + consumerInfo);
- }
- }
-
- protected void DrainMessageQueueIfAny()
- {
- if (OnMessage != null && messageQueue.Count > 0)
- {
- DispatchEvent deliveryTask = new MessageListenerDispatchEvent(this);
- Session.Dispatcher.Enqueue(deliveryTask);
- }
- }
-
- protected void PrepareMessageForDelivery(Message.Message message)
- {
- if (message == null) return;
- if(message is Message.BytesMessage)
- {
- (message as Message.BytesMessage).Reset();
- }
- else if(message is Message.StreamMessage)
- {
- (message as Message.StreamMessage).Reset();
- }
- else
- {
- message.IsReadOnly = true;
- }
- message.IsReadOnlyProperties = true;
- }
-
- #endregion
-
- #region Internal Methods
-
- internal bool HasSubscription(string name)
- {
- return !IsClosed && IsDurable && String.Compare(name, this.consumerInfo.SubscriptionName, false) == 0;
- }
-
- internal bool IsUsingDestination(IDestination destination)
- {
- return this.Destination.Equals(destination);
- }
-
- internal void Recover()
- {
- Tracer.DebugFormat("Session recover for consumer: {0}", Id);
- IMessageCloak cloak = null;
- IMessageDelivery delivery = null;
- lock (messageQueue.SyncRoot)
- {
- while ((delivery = delivered.Last?.Value) != null)
- {
- cloak = delivery.Message.GetMessageCloak();
- cloak.DeliveryCount = cloak.DeliveryCount + 1;
- (delivery as MessageDelivery).EnqueueFirst = true;
- delivered.RemoveLast();
- SendForDelivery(delivery);
- }
- delivered.Clear();
- }
- }
-
- internal void AcknowledgeMessage(Message.Message message, Message.AckType ackType)
- {
-
- ThrowIfClosed();
- IMessageDelivery nmsDelivery = null;
- foreach (IMessageDelivery delivery in delivered)
- {
- if (delivery.Message.Equals(message))
- {
- nmsDelivery = delivery;
- }
- }
- if(nmsDelivery == null)
- {
- nmsDelivery = new MessageDelivery(message);
- }
- switch (ackType)
- {
- case Message.AckType.ACCEPTED:
- AckConsumed(nmsDelivery);
- break;
- case Message.AckType.MODIFIED_FAILED:
- AckModified(nmsDelivery, true);
- break;
- case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
- AckModified(nmsDelivery, true, true);
- break;
- case Message.AckType.REJECTED:
- AckRejected(nmsDelivery);
- break;
- case Message.AckType.RELEASED:
- AckReleased(nmsDelivery);
- break;
- default:
- throw new NMSException("Unkown message acknowledgement type " + ackType);
- }
- }
-
- internal void Acknowledge(Message.AckType ackType)
- {
-
- foreach(IMessageDelivery delivery in delivered.ToArray())
- {
- switch (ackType)
- {
- case Message.AckType.ACCEPTED:
- AckConsumed(delivery);
- break;
- case Message.AckType.MODIFIED_FAILED:
- AckModified(delivery, true);
- break;
- case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
- AckModified(delivery, true, true);
- break;
- case Message.AckType.REJECTED:
- AckRejected(delivery);
- break;
- case Message.AckType.RELEASED:
- AckReleased(delivery);
- break;
- default:
- Tracer.WarnFormat("Unkown message acknowledgement type {0} for message {}", ackType, delivery.Message.NMSMessageId);
- break;
- }
- }
- delivered.Clear();
- }
-
- #endregion
-
- #region IMessageConsumer Properties
-
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get
- {
- throw new NotImplementedException();
- }
-
- set
- {
- throw new NotImplementedException();
- }
- }
-
- #endregion
-
- #region IMessageConsumer Events
- protected event MessageListener OnMessage;
-
- public event MessageListener Listener
- {
- add
- {
-
- if (this.IsStarted)
- {
- throw new IllegalStateException("Cannot add MessageListener to consumer " + Id + " on a started Connection.");
- }
- if(value != null)
- {
- OnMessage += value;
- }
- }
- remove
- {
- if (this.IsStarted)
- {
- throw new IllegalStateException("Cannot remove MessageListener to consumer " + Id + " on a started Connection.");
- }
- if (value != null)
- {
- OnMessage -= value;
- }
- }
- }
-
- #endregion
-
- #region IMessageConsumer Methods
-
- public IMessage Receive()
- {
- ThrowIfClosed();
- ThrowIfAsync();
- if (TryDequeue(out IMessageDelivery delivery, -1))
- {
- Message.Message copy = delivery.Message.Copy();
- PrepareMessageForDelivery(copy);
- AckReceived(delivery);
- return copy;
- }
- return null;
- }
-
- public IMessage Receive(TimeSpan timeout)
- {
- ThrowIfClosed();
- ThrowIfAsync();
- int timeoutMilis = Convert.ToInt32(timeout.TotalMilliseconds);
- if(timeoutMilis == 0)
- {
- timeoutMilis = -1;
- }
- if (TryDequeue(out IMessageDelivery delivery, timeoutMilis))
- {
- Message.Message copy = delivery.Message.Copy();
- PrepareMessageForDelivery(copy);
- AckReceived(delivery);
- return copy;
- }
- return null;
- }
-
- public IMessage ReceiveNoWait()
- {
- ThrowIfClosed();
- ThrowIfAsync();
- if (TryDequeue(out IMessageDelivery delivery, 0))
- {
- Message.Message copy = delivery.Message.Copy();
- PrepareMessageForDelivery(copy);
- AckReceived(delivery);
- return copy;
- }
- return null;
- }
-
- #endregion
-
- #region MessageLink Methods
-
- private Target CreateTarget()
- {
- Target target = new Target();
- return target;
- }
-
- private Source CreateSource()
- {
- Source source = new Source();
- source.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
- source.Outcomes = new Amqp.Types.Symbol[]
- {
- SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
- SymbolUtil.ATTACH_OUTCOME_RELEASED,
- SymbolUtil.ATTACH_OUTCOME_REJECTED,
- SymbolUtil.ATTACH_OUTCOME_MODIFIED
- };
- source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;
-
- if (this.IsDurable)
- {
- source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
- source.Durable = (int)TerminusDurability.UNSETTLED_STATE;
- source.DistributionMode=SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
- }
- else
- {
- source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
- source.Durable = (int)TerminusDurability.NONE;
- }
-
- if (this.IsBrowser)
- {
- source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
- }
-
- source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
-
- Amqp.Types.Map filters = new Amqp.Types.Map();
-
- // TODO add filters for noLocal and Selector using appropriate Amqp Described types
-
- // No Local
- // qpid jms defines a no local filter as an amqp described type
- // AmqpJmsNoLocalType where
- // Descriptor = 0x0000468C00000003UL
- // Described = "NoLocalFilter{}" (type string)
- if (consumerInfo.NoLocal)
- {
- filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
- }
-
- // Selector
- // qpid jms defines a selector filter as an amqp described type
- // AmqpJmsSelectorType where
- // Descriptor = 0x0000468C00000004UL
- // Described = "<selector_string>" (type string)
- if (this.HasSelector)
- {
- filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, this.consumerInfo.Selector);
- }
-
- // Assign filters
- if (filters.Count > 0)
- {
- source.FilterSet = filters;
- }
-
- return source;
- }
-
- protected override ILink CreateLink()
- {
- Attach attach = new Amqp.Framing.Attach()
- {
- Target = CreateTarget(),
- Source = CreateSource(),
- RcvSettleMode = ReceiverSettleMode.First,
- SndSettleMode = (IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
- };
- string name = null;
- if (IsDurable)
- {
- name = consumerInfo.SubscriptionName;
- }
- else
- {
- string destinationAddress = (attach.Source as Source).Address ?? "";
- name = "nms:receiver:" + this.ConsumerId.ToString()
- + ((destinationAddress.Length == 0) ? "" : (":" + destinationAddress));
- }
- IReceiverLink link = new ReceiverLink(Session.InnerSession as Amqp.Session, name, attach, OnAttachResponse);
- return link;
- }
-
- protected override void OnInternalClosed(IAmqpObject sender, Error error)
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Received Close notification for MessageConsumer {0} {1} {2}",
- this.Id,
- IsDurable ? "with subscription name " + this.consumerInfo.SubscriptionName : "",
- error == null ? "" : "with cause " + error);
- }
- base.OnInternalClosed(sender, error);
- this.OnResponse();
- }
-
- protected override void StopResource()
- {
- if (Session.Dispatcher.IsOnDispatchThread)
- {
- throw new IllegalStateException("Cannot stop Connection {0} in MessageListener.", Session.Connection.ClientId);
- }
- // Cut message window
- // TODO figure out draining message window without raising a closed window exception (link-credit-limit-exceeded Error) from amqpnetlite.
- //SendFlow(1);
- // Stop message delivery
- this.messageQueue.Stop();
- // Now wait until the MessageListener callback is finish executing.
- this.WaitOnMessageListenerEvent();
- }
-
- protected override void StartResource()
- {
- // Do Attach request if not done already
- base.StartResource();
- // Start Message Delivery
- messageQueue.Start();
- DrainMessageQueueIfAny();
-
- // Setup AMQP message transport thread callback
- OnInboundAMQPMessage = OnInboundMessage;
- // Open Message Window to receive messages.
- this.Link.Start(consumerInfo.LinkCredit, OnInboundAMQPMessage);
-
- }
-
-
-
- /// <summary>
- /// Executes the AMQP network detach operation.
- /// </summary>
- /// <param name="timeout">
- /// Timeout to wait for for detach response. A timeout of 0 or less will not block to wait for a response.
- /// </param>
- /// <param name="cause">Error to detach link. Can be null.</param>
- /// <exception cref="Amqp.AmqpException">
- /// Throws when an error occur during amqp detach. Or contains Error response from detach.
- /// </exception>
- /// <exception cref="System.TimeoutException">
- /// Throws when detach response is not received in specified timeout.
- /// </exception>
- protected override void DoClose(TimeSpan timeout, Error cause = null)
- {
- if(IsDurable)
- {
- Task t = this.Link.DetachAsync(cause);
- if(TimeSpan.Compare(timeout, TimeSpan.Zero) > 0)
- {
- /*
- * AmqpNetLite does not allow a timeout to be specific for link detach request even though
- * it uses the same close operation which takes a parameter for timeout. AmqpNetLite uses
- * it default timeout of 60000ms, see AmqpObject.DefaultTimeout, for the detach close
- * operation forcing the detach request to be synchronous. To allow for asynchronous detach
- * request an NMS MessageConsumer must call the DetachAsync method on a link which will block
- * for up to 60000ms asynchronously to set a timeout exception or complete the task.
- */
- const int amqpNetLiteDefaultTimeoutMillis = 60000; // taken from AmqpObject.DefaultTimeout
- TimeSpan amqpNetLiteDefaultTimeout = TimeSpan.FromMilliseconds(amqpNetLiteDefaultTimeoutMillis);
- // Create timeout which allows for the 60000ms block in the DetachAsync task.
- TimeSpan actualTimeout = amqpNetLiteDefaultTimeout + timeout;
-
- TaskUtil.Wait(t, actualTimeout);
- if(t.Exception != null)
- {
- if(t.Exception is AggregateException)
- {
- throw t.Exception.InnerException;
- }
- else
- {
- throw t.Exception;
- }
- }
- }
- }
- else
- {
- base.DoClose(timeout, cause);
- }
- }
- /// <summary>
- /// Overload for the Template method <see cref="MessageLink.Shutdown"/> specific to <see cref="MessageConsumer"/>.
- /// </summary>
- /// <param name="closeMessageQueue">Indicates whether or not to close the messageQueue for the MessageConsumer.</param>
- internal override void Shutdown()
- {
- this.messageQueue.Close();
- }
-
- #endregion
-
- #region IDisposable Methods
- public void Dispose()
- {
- try
- {
- this.Close();
- }
- catch (Exception ex)
- {
- Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", this.GetType().Name, this.Id, ex);
- }
- }
- protected override void Dispose(bool disposing)
- {
- if (!IsClosing && !IsClosed)
- {
- Tracer.InfoFormat("Consumer {0} stats: Transport Msgs {1}, Dispatch Msgs {2}, messageQueue {3}.",
- Id, transportMsgCount, messageDispatchCount, messageQueue.Count);
- }
- base.Dispose(disposing);
- MessageListenerInUseEvent.Dispose();
- }
-
-
- #endregion
-
- #region Inner MessageListenerDispatchEvent Class
-
- protected class MessageListenerDispatchEvent : WaitableDispatchEvent
- {
- private MessageConsumer consumer;
-
- internal MessageListenerDispatchEvent(MessageConsumer consumer) : base()
- {
- this.consumer = consumer;
- Callback = this.DispatchMessageListeners;
- }
-
- public override void OnFailure(Exception e)
- {
- base.OnFailure(e);
- consumer.Session.OnException(e);
- }
-
- public void DispatchMessageListeners()
- {
- IMessageDelivery delivery = null;
- Message.Message nmsProviderMessage = null;
- if (consumer.IsClosed) return;
- consumer.EnterMessageListenerEvent();
- // the consumer pending Message delivery task
-
- while ((delivery = consumer.messageQueue.DequeueNoWait()) != null)
- {
- nmsProviderMessage = delivery.Message;
- consumer.AddTaskRef();
- consumer.messageDispatchCount++;
- try
- {
-
- if (consumer.IsMessageExpired(delivery))
- {
- consumer.AckModified(delivery, true);
- }
- else if (consumer.IsMessageRedeliveryExceeded(delivery))
- {
- consumer.AckModified(delivery, true, true);
- }
- else
- {
-
- bool deliveryFailed = false;
- bool isAutoOrDupsOk = consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
- consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.DupsOkAcknowledge);
- if (isAutoOrDupsOk)
- {
- consumer.delivered.AddLast(delivery);
- }
- else
- {
- consumer.AckReceived(delivery);
- }
-
- Message.Message copy = nmsProviderMessage.Copy();
- try
- {
- consumer.Session.ClearRecovered();
- consumer.PrepareMessageForDelivery(copy);
- if (Tracer.IsDebugEnabled)
- Tracer.DebugFormat("Invoking Client Message Listener Callback for message {0}.", copy.NMSMessageId);
- consumer.OnMessage(copy);
- }
- catch (SystemException se)
- {
- Tracer.WarnFormat("Caught Exception on MessageListener for Consumer {0}. Message {1}.", consumer.Id, se.Message);
- deliveryFailed = true;
- }
-
- if (isAutoOrDupsOk && !consumer.Session.IsRecovered)
- {
- if (!deliveryFailed)
- {
- consumer.AckConsumed(delivery);
- }
- else
- {
- consumer.AckReleased(delivery);
- }
- }
- }
-
- }
- catch (Exception e)
- {
- // unhandled failure
- consumer.Session.OnException(e);
- }
- consumer.RemoveTaskRef();
- }
- consumer.LeaveMessageListenerEvent();
- }
- }
-
- #endregion
- }
-
- #region Info class
- internal class ConsumerInfo : LinkInfo
- {
-
- protected const int DEFAULT_CREDIT = 200;
-
- private int? credit = null;
-
- internal ConsumerInfo(Id id) : base(id) { }
-
- public int LinkCredit
- {
- get { return credit ?? DEFAULT_CREDIT; }
- internal set { credit = value; }
- }
-
- public string Selector { get; internal set; } = null;
- public string SubscriptionName { get; internal set; } = null;
-
- public bool NoLocal { get; internal set; } = false;
-
- }
-
- #endregion
-
-}
diff --git a/src/NMS.AMQP/MessageLink.cs b/src/NMS.AMQP/MessageLink.cs
deleted file mode 100644
index 8958f9b..0000000
--- a/src/NMS.AMQP/MessageLink.cs
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp;
-using Amqp.Framing;
-using System.Reflection;
-using Apache.NMS.AMQP.Util;
-using System.Collections.Specialized;
-
-namespace Apache.NMS.AMQP
-{
-
- internal enum LinkState
- {
- UNKNOWN = -1,
- INITIAL = 0,
- ATTACHSENT = 1,
- ATTACHED = 2,
- DETACHSENT = 3,
- DETACHED = 4
- }
-
- internal enum TerminusDurability
- {
- NONE = 0,
- CONFIGURATION = 1,
- UNSETTLED_STATE = 2,
- }
-
- /// <summary>
- /// Abstract Template for AmqpNetLite Amqp.ILink.
- /// This class Templates the performative Attach and Detached process for the amqp procotol engine class.
- /// The template operations are Attach and Detach.
- /// </summary>
- abstract class MessageLink : NMSResource<LinkInfo>
- {
- private CountDownLatch responseLatch=null;
- private ILink impl;
- private Atomic<LinkState> state = new Atomic<LinkState>(LinkState.INITIAL);
- private readonly Session session;
- private readonly IDestination destination;
- private System.Threading.ManualResetEvent PerformativeOpenEvent = new System.Threading.ManualResetEvent(false);
-
- protected MessageLink(Session ses, Destination dest)
- {
- session = ses;
- destination = dest;
- }
-
- protected MessageLink(Session ses, IDestination dest)
- {
- session = ses;
- if(dest is Destination || dest == null)
- {
- destination = dest as Destination;
- }
- else
- {
- if (!dest.IsTemporary)
- {
- if(dest.IsQueue)
- {
- destination = Session.GetQueue((dest as IQueue).QueueName) as Destination;
- }
- else
- {
- destination = Session.GetQueue((dest as ITopic).TopicName) as Destination;
- }
-
- }
- else
- {
- throw new NotImplementedException("Foreign temporary Destination Implementation Not Supported.");
- }
- }
-
- }
-
- internal virtual Session Session { get => session; }
-
- internal IDestination Destination { get { return destination; } }
-
- protected LinkState State { get => this.state.Value; }
-
- protected ILink Link
- {
- get { return impl; }
- private set { }
- }
-
- internal bool IsClosing { get { return state.Value.Equals(LinkState.DETACHSENT); } }
-
- internal bool IsClosed { get { return state.Value.Equals(LinkState.DETACHED); } }
-
- protected bool IsConfigurable { get { return state.Value.Equals(LinkState.INITIAL); } }
-
- protected bool IsOpening { get { return state.Value.Equals(LinkState.ATTACHSENT); } }
-
- protected bool IsRequestPending { get => this.responseLatch != null; }
-
- internal NMSException FailureCause { get; set; } = null;
-
- internal void SetFailureCause(Error error, string reason = "")
- {
- this.FailureCause = ExceptionSupport.GetException(error, reason);
- }
-
- internal void Attach()
- {
- if (state.CompareAndSet(LinkState.INITIAL, LinkState.ATTACHSENT))
- {
- PerformativeOpenEvent.Reset();
- responseLatch = new CountDownLatch(1);
- impl = CreateLink();
- this.Link.AddClosedCallback(this.OnInternalClosed);
- LinkState finishedState = LinkState.UNKNOWN;
- try
- {
- bool received = true;
- if (this.Info.requestTimeout <= 0)
- {
- responseLatch.await();
- }
- else
- {
- received = responseLatch.await(RequestTimeout);
- }
- if(received && this.impl.Error == null)
- {
- finishedState = LinkState.ATTACHED;
- }
- else
- {
- finishedState = LinkState.INITIAL;
- if (!received)
- {
- Tracer.InfoFormat("Link {0} Attach timeout", Info.Id);
- this.OnTimeout();
- }
- else
- {
- Tracer.InfoFormat("Link {0} Attach error: {1}", Info.Id, this.impl.Error);
- this.OnFailure();
- }
- }
-
-
- }
- finally
- {
- responseLatch = null;
- state.GetAndSet(finishedState);
- if(!state.Value.Equals(LinkState.ATTACHED) && !this.impl.IsClosed)
- {
- DoClose();
- }
- PerformativeOpenEvent.Set();
- }
-
- }
- }
-
- protected virtual void Detach()
- {
- if (state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHSENT))
- {
- try
- {
- DoClose();
- }
- catch (NMSException)
- {
- throw;
- }
- catch (Exception ex)
- {
- throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", this.Id);
- }
- }
- else if (state.CompareAndSet(LinkState.INITIAL, LinkState.DETACHED))
- {
- // Link has not been established yet set state to dettached.
- }
- else if (state.Value.Equals(LinkState.ATTACHSENT))
- {
- // The Message Link is trying to estalish a link. It should wait until the Attach response is processed.
- bool signaled = this.PerformativeOpenEvent.WaitOne(this.RequestTimeout);
- if (signaled)
- {
- if (state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHSENT))
- {
- // The Attach request completed succesfully establishing a link.
- // Now Close link.
- try
- {
- DoClose();
- }
- catch (NMSException)
- {
- throw;
- }
- catch (Exception ex)
- {
- throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", this.Id);
- }
- }
- else if (state.CompareAndSet(LinkState.INITIAL, LinkState.DETACHED))
- {
- // Failed to establish a link set state to Detached.
- }
- }
- else
- {
- // Failed to receive establishment event signal.
- state.GetAndSet(LinkState.DETACHED);
- }
-
-
- }
- }
-
- /// <summary>
- /// Defines the asynchronous Amqp.ILink error and close notification handler for the template.
- /// This Method matches the delegate <see cref="Amqp.ClosedCallback"/>.
- /// Concrete implementations are required to implement this method.
- /// </summary>
- /// <param name="sender">
- /// The <see cref="Amqp.IAmqpObject"/> that has closed. Also, <seealso cref="Amqp.ClosedCallback"/>.
- /// This will always be an ILink for the template.
- /// </param>
- /// <param name="error">
- /// The <see cref="Amqp.Framing.Error"/> that caused the link to close.
- /// This can be null should the link be closed intentially.
- /// </param>
- protected virtual void OnInternalClosed(Amqp.IAmqpObject sender, Error error)
- {
- bool failureThrow = true;
- Session parent = null;
- string name = (sender as ILink)?.Name;
- NMSException failure = ExceptionSupport.GetException(error,
- "Received Amqp link detach with Error for link {0}", name);
-
- try
- {
- parent = this.Session;
- this.FailureCause = failure;
- if (this.state.CompareAndSet(LinkState.DETACHSENT, LinkState.DETACHED))
- {
- // expected close
- parent.Remove(this);
- }
- else if (this.state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHED))
- {
- // unexpected close or parent close
- if (this.IsStarted)
- {
- // indicate unexpected close
- this.Shutdown();
- }
- else
- {
- failureThrow = false;
- }
-
- parent.Remove(this);
- }
- }
- catch (Exception ex)
- {
- // failure during NMS object instance cleanup.
- Tracer.DebugFormat("Caught exception during Amqp Link {0} close. Exception {1}", name, ex);
- }
-
- // Determine if there is a provider controlled call for this closed callback.
- if (!failureThrow && failure != null)
- {
- // no provider controlled call.
- // Log error if any.
-
- // Log exception.
- Tracer.InfoFormat("Amqp Performative detach response error. Message {0}", failure.Message);
-
- }
- }
-
- /// <summary>
- /// Defines the link create operation for the abstract template.
- /// Concrete implmentations are required to implement this method.
- /// </summary>
- /// <returns>
- /// An ILink that was configured by concrete implementation.
- /// </returns>
- protected abstract ILink CreateLink();
-
- /// <summary>
- /// See <see cref="MessageLink.DoClose(TimeSpan, Error)"/>.
- /// </summary>
- /// <param name="cause"></param>
- protected void DoClose(Error cause = null)
- {
- this.DoClose(TimeSpan.FromMilliseconds(Info.closeTimeout), cause);
- }
-
- /// <summary>
- /// Defines the link close operation for the abstract template.
- /// Concrete implementation can implement this method to override the default Link close operation.
- /// </summary>
- /// <param name="timeout">
- /// Timeout on network operation close for link.
- /// If greater then 0 then operation will be blocking.
- /// Otherwise the network operation is non-blocking.
- /// </param>
- /// <param name="cause">
- /// The amqp Error that caused the link to close.
- /// </param>
- /// <exception cref="Amqp.AmqpException">Throws for Detach response errors.</exception>
- /// <exception cref="System.TimeoutException">Throws when timeout expires for blocking detach request.</exception>
- protected virtual void DoClose(TimeSpan timeout, Error cause = null)
- {
- if (this.impl != null && !this.impl.IsClosed)
- {
- Tracer.DebugFormat("Detaching amqp link {0} for {1} with timeout {2}", this.Link.Name, this.Id, timeout);
- this.impl.Close(timeout, cause);
- }
- }
-
- protected virtual void OnResponse()
- {
- if (responseLatch != null)
- {
- responseLatch.countDown();
- }
- }
-
- protected virtual void OnTimeout()
- {
- throw ExceptionSupport.GetTimeoutException(this.impl, "Performative Attach Timeout while waiting for response.");
- }
-
- protected virtual void OnFailure()
- {
- throw ExceptionSupport.GetException(this.impl, "Performative Attach Error.");
- }
-
- protected virtual void Configure()
- {
- StringDictionary connProps = Session.Connection.Properties;
- StringDictionary sessProps = Session.Properties;
- PropertyUtil.SetProperties(Info, connProps);
- PropertyUtil.SetProperties(Info, sessProps);
-
- }
-
-
- #region NMSResource Methhods
-
- protected override void StartResource()
- {
- this.Attach();
- }
-
- protected override void ThrowIfClosed()
- {
- if (state.Value.Equals(LinkState.DETACHED) || this.FailureCause != null)
- {
- throw new Apache.NMS.IllegalStateException("Illegal operation on closed I" + this.GetType().Name + ".", this.FailureCause);
- }
- }
-
- #endregion
-
- #region Public Inheritable Properties
-
- public TimeSpan RequestTimeout
- {
- get { return TimeSpan.FromMilliseconds(Info.requestTimeout); }
- set { Info.requestTimeout = Convert.ToInt64(value.TotalMilliseconds); }
- }
-
- #endregion
-
- #region Public Inheritable Methods
-
- // All derived classes from MessageLink must be IDisposable.
- // Use the IDispoable pattern for Close() and have Dispose() just call Close() and Close() do
- // the things usually done in Dispose().
- public void Close()
- {
- this.Dispose(true);
- if (IsClosed)
- {
- GC.SuppressFinalize(this);
- }
- }
- /// <summary>
- /// Implements a template for the IDisposable Parttern.
- /// </summary>
- /// <param name="disposing"></param>
- protected virtual void Dispose(bool disposing)
- {
- if (this.IsClosed) return;
- if (disposing)
- {
- // orderly shutdown
- this.Stop();
- this.Shutdown();
- try
- {
- this.Detach();
- }
- catch (Exception ex)
- {
- // Log network errors
- Tracer.DebugFormat("Performative detached raised exception for {0} {1}. Message {2}",
- this.GetType().Name, this.Id, ex.Message);
- }
- finally
- {
- // in case detach failed or times out.
- if (!this.IsClosed)
- {
- this.Session.Remove(this);
- this.state.Value = LinkState.DETACHED;
- }
- this.impl = null;
- }
- }
- }
-
- #endregion
-
- #region Shutdown Template methods
-
- /// <summary>
- /// Shutdown orderly shuts down the Message link facilities.
- /// </summary>
- internal virtual void Shutdown()
- {
- }
-
-
- #endregion
- }
-
- #region LinkInfo Class
-
- internal abstract class LinkInfo : ResourceInfo
- {
- protected static readonly long DEFAULT_REQUEST_TIMEOUT;
- static LinkInfo()
- {
- DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
- }
-
- protected LinkInfo(Id linkId) : base(linkId)
- {
-
- }
-
- public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long closeTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long sendTimeout { get; set; }
-
- public override string ToString()
- {
- string result = "";
- result += "LinkInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
- if (prop.GetGetMethod(true).IsPublic)
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- }
-
- #endregion
-}
diff --git a/src/NMS.AMQP/MessageProducer.cs b/src/NMS.AMQP/MessageProducer.cs
deleted file mode 100644
index 08d1f7d..0000000
--- a/src/NMS.AMQP/MessageProducer.cs
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Specialized;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.Reflection;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Message;
-using Apache.NMS.AMQP.Message.AMQP;
-using Apache.NMS.AMQP.Message.Cloak;
-using Amqp;
-using Amqp.Framing;
-using System.Threading;
-
-namespace Apache.NMS.AMQP
-{
- /// <summary>
- /// Apache.NMS.AMQP.MessageProducer facilitates management and creates the underlying Amqp.SenderLink protocol engine object.
- /// Apache.NMS.AMQP.MessageProducer is also a Factory for Apache.NMS.AMQP.Message.Message types.
- /// </summary>
- class MessageProducer : MessageLink, IMessageProducer
- {
- private IdGenerator msgIdGenerator;
- private ISenderLink link;
- private ProducerInfo producerInfo;
-
- // Stat fields
- private int MsgsSentOnLink = 0;
-
- #region Constructor
-
- internal MessageProducer(Session ses, IDestination dest) : base(ses, dest)
- {
- producerInfo = new ProducerInfo(ses.ProducerIdGenerator.GenerateId());
- Info = producerInfo;
- Configure();
-
- }
-
- #endregion
-
- #region Internal Properties
-
- internal Session InternalSession { get { return Session; } }
-
- internal Id ProducerId { get { return producerInfo.Id; } }
-
- #endregion
-
- #region Private Methods
-
- private void ConfigureMessage(IMessage msg)
- {
- msg.NMSPriority = Priority;
- msg.NMSDeliveryMode = DeliveryMode;
- msg.NMSDestination = Destination;
- }
-
- private void OnAttachedResp(ILink link, Attach resp)
- {
- Tracer.InfoFormat("Received Performation Attach response on Link: {0}, Response: {1}", ProducerId, resp.ToString());
-
- OnResponse();
- }
-
- internal void OnException(Exception e)
- {
- Session.OnException(e);
- }
-
- private Target CreateTarget()
- {
- Target t = new Target();
-
- t.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
-
- t.Timeout = (uint)producerInfo.sendTimeout;
-
- // Durable is used for a durable subscription
- t.Durable = (uint)TerminusDurability.NONE;
-
- if (Destination != null)
- {
- t.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
- }
- t.Dynamic = false;
-
- return t;
- }
-
- private Source CreateSource()
- {
- Source s = new Source();
- s.Address = this.ProducerId.ToString();
- s.Timeout = (uint)producerInfo.sendTimeout;
- s.Outcomes = new Amqp.Types.Symbol[]
- {
- SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
- SymbolUtil.ATTACH_OUTCOME_REJECTED,
- };
- return s;
- }
-
- private Attach CreateAttachFrame()
- {
- Attach frame = new Attach();
- frame.Source = CreateSource();
- frame.Target = CreateTarget();
- frame.SndSettleMode = SenderSettleMode.Unsettled;
- frame.IncompleteUnsettled = false;
- frame.InitialDeliveryCount = 0;
-
- return frame;
- }
-
- #endregion
-
- #region MessageLink abstract Methods
-
- protected override ILink CreateLink()
- {
- Attach frame = CreateAttachFrame();
-
- string linkName = producerInfo.Id + ":" + UriUtil.GetAddress(Destination, Session.Connection);
- link = new SenderLink(Session.InnerSession as Amqp.Session, linkName, frame, OnAttachedResp);
-
- return link;
- }
-
- protected override void OnInternalClosed(IAmqpObject sender, Error error)
- {
- base.OnInternalClosed(sender, error);
- this.OnResponse();
- }
-
- #endregion
-
- #region NMSResource Methods
-
- protected override void StopResource()
- {
-
- }
-
- #endregion
-
- #region IMessageProducer Properties
-
- public MsgDeliveryMode DeliveryMode
- {
- get { return producerInfo.msgDelMode; }
- set { producerInfo.msgDelMode = value; }
- }
-
- public bool DisableMessageID
- {
- get { return producerInfo.disableMsgId; }
- set { producerInfo.disableMsgId = value; }
- }
-
- public bool DisableMessageTimestamp
- {
- get { return producerInfo.disableTimeStamp; }
- set { producerInfo.disableTimeStamp = value; }
- }
-
- public MsgPriority Priority
- {
- get { return producerInfo.priority; }
- set { producerInfo.priority = value; }
- }
-
- public ProducerTransformerDelegate ProducerTransformer
- {
- get
- {
- throw new NotImplementedException();
- }
-
- set
- {
- throw new NotImplementedException();
- }
- }
-
- public TimeSpan TimeToLive
- {
- get { return TimeSpan.FromMilliseconds(producerInfo.ttl); }
- set { producerInfo.ttl = Convert.ToInt64(value.TotalMilliseconds); }
- }
-
- #endregion
-
- #region IMessageProducer Methods
-
- public IBytesMessage CreateBytesMessage()
- {
- this.ThrowIfClosed();
- return Session.CreateBytesMessage();
- }
-
- public IBytesMessage CreateBytesMessage(byte[] body)
- {
- this.ThrowIfClosed();
- IBytesMessage msg = CreateBytesMessage();
- msg.WriteBytes(body);
- return msg;
- }
-
- public IMapMessage CreateMapMessage()
- {
- this.ThrowIfClosed();
- return Session.CreateMapMessage();
- }
-
- public IMessage CreateMessage()
- {
- this.ThrowIfClosed();
- return Session.CreateMessage();
- }
-
- public IObjectMessage CreateObjectMessage(object body)
- {
- this.ThrowIfClosed();
- return Session.CreateObjectMessage(body);
- }
-
- public IStreamMessage CreateStreamMessage()
- {
- this.ThrowIfClosed();
- return Session.CreateStreamMessage();
- }
-
- public ITextMessage CreateTextMessage()
- {
- this.ThrowIfClosed();
- return Session.CreateTextMessage();
- }
-
- public ITextMessage CreateTextMessage(string text)
- {
- ITextMessage msg = CreateTextMessage();
- msg.Text = text;
- return msg;
- }
-
- protected override void Dispose(bool disposing)
- {
- bool wasNotClosed = !IsClosed;
- base.Dispose(disposing);
- if (IsClosed && wasNotClosed)
- {
- Tracer.InfoFormat("Closing Producer {0}, MsgSentOnLink {1}", Id, MsgsSentOnLink);
- }
- }
-
- public void Dispose()
- {
- this.Close();
- }
-
- public void Send(IMessage message)
- {
- Send(message, DeliveryMode, Priority, TimeToLive);
- }
-
- public void Send(IDestination destination, IMessage message)
- {
- Send(destination, message, DeliveryMode, Priority, TimeToLive);
- }
-
- public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- this.ThrowIfClosed();
- if (Destination == null)
- {
- throw new IllegalStateException("Can not Send message on Anonymous Producer (without Destination).");
- }
- if (message == null)
- {
- throw new IllegalStateException("Can not Send a null message.");
- }
-
- DoSend(Destination, message, deliveryMode, priority, timeToLive);
- }
-
- public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- this.ThrowIfClosed();
- if (Destination != null)
- {
- throw new IllegalStateException("Can not Send message on Fixed Producer (with Destination).");
- }
- if (message == null)
- {
- throw new IllegalStateException("Can not Send a null message.");
- }
-
- DoSend(destination, message, deliveryMode, priority, timeToLive);
- }
-
- #endregion
-
- #region Protected Methods
-
- protected override void Configure()
- {
- base.Configure();
- }
-
- protected IdGenerator MessageIdGenerator
- {
- get
- {
- if (msgIdGenerator == null)
- {
- msgIdGenerator = new CustomIdGenerator(
- true,
- "ID",
- new AtomicSequence()
- );
- }
- return msgIdGenerator;
- }
- }
-
- protected void PrepareMessageForSend(Message.Message message)
- {
- if (message == null) return;
- if (message is Message.BytesMessage)
- {
- (message as Message.BytesMessage).Reset();
- }
- else if (message is Message.StreamMessage)
- {
- (message as Message.StreamMessage).Reset();
- }
- else
- {
- message.IsReadOnly = true;
- }
- message.IsReadOnlyProperties = true;
- }
-
- protected void DoSend(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
- {
- this.Attach();
- bool sendSync = deliveryMode.Equals(MsgDeliveryMode.Persistent);
- if(destination.IsTemporary && (destination as TemporaryDestination).IsDeleted)
- {
- throw new InvalidDestinationException("Can not send message on deleted temporary topic.");
- }
- message.NMSDestination = destination;
- message.NMSDeliveryMode = deliveryMode;
- message.NMSPriority = priority;
- // If there is timeToLive, set it before setting NMSTimestamp as timeToLive
- // is required to calculate absolute expiry time.
- // TBD: If the messageProducer has a non-default timeToLive and the message
- // already has a timeToLive set by application, which should take precedence, this
- // code overwrites the message TimeToLive in this case but not if the producer TimeToLive
- // is the default ...
- if (timeToLive != NMSConstants.defaultTimeToLive)
- {
- message.NMSTimeToLive = timeToLive;
- }
- if (!DisableMessageTimestamp)
- {
- message.NMSTimestamp = DateTime.UtcNow;
- }
-
-
- if (!DisableMessageID)
- {
- message.NMSMessageId = MessageIdGenerator.GenerateId().ToString();
- }
-
- Amqp.Message amqpmsg = null;
- if (message is Message.Message)
- {
- Message.Message copy = (message as Message.Message).Copy();
- copy.NMSDestination = DestinationTransformation.Transform(Session.Connection, destination);
- PrepareMessageForSend(copy);
- IMessageCloak cloak = copy.GetMessageCloak();
- if (cloak is AMQPMessageCloak)
- {
- amqpmsg = (cloak as AMQPMessageCloak).AMQPMessage;
- }
- }
- else
- {
- Message.Message nmsmsg = this.Session.Connection.TransformFactory.TransformMessage<Message.Message>(message);
- PrepareMessageForSend(nmsmsg);
- IMessageCloak cloak = nmsmsg.GetMessageCloak().Copy();
- if (cloak is AMQPMessageCloak)
- {
- amqpmsg = (cloak as AMQPMessageCloak).AMQPMessage;
- }
- }
-
-
-
- if (amqpmsg != null)
- {
- if (Tracer.IsDebugEnabled)
- Tracer.DebugFormat("Sending message : {0}", message.ToString());
-
- if(sendSync)
- {
- DoAMQPSendSync(amqpmsg, this.RequestTimeout);
- }
- else
- {
- DoAMQPSendAsync(amqpmsg, HandleAsyncAMQPMessageOutcome);
- }
- }
-
- }
-
- protected void DoAMQPSendAsync(Amqp.Message amqpMessage, OutcomeCallback ackCallback)
- {
-
- try
- {
- this.link.Send(amqpMessage, ackCallback, this);
- MsgsSentOnLink++;
- }
- catch(Exception ex)
- {
- Tracer.ErrorFormat("Encountered Error on sending message from Producer {0}. Message: {1}. Stack : {2}.", Id, ex.Message, ex.StackTrace);
- throw ExceptionSupport.Wrap(ex);
- }
- }
-
- private static void HandleAsyncAMQPMessageOutcome(Amqp.ILink sender, Amqp.Message message, Outcome outcome, object state)
- {
- MessageProducer thisPtr = state as MessageProducer;
- Exception failure = null;
- bool isProducerClosed = (thisPtr.IsClosing || thisPtr.IsClosed);
- if (outcome.Descriptor.Name.Equals(MessageSupport.REJECTED_INSTANCE.Descriptor.Name) && !isProducerClosed)
- {
- string msgId = MessageSupport.CreateNMSMessageId(message.Properties.GetMessageId());
- Error err = (outcome as Amqp.Framing.Rejected).Error;
- failure = ExceptionSupport.GetException(err, "Msg {0} rejected", msgId);
- }
- else if (outcome.Descriptor.Name.Equals(MessageSupport.RELEASED_INSTANCE.Descriptor.Name) && !isProducerClosed)
- {
- string msgId = MessageSupport.CreateNMSMessageId(message.Properties.GetMessageId());
- Error err = new Error(ErrorCode.MessageReleased);
- err.Description = "AMQP Message has been release by peer.";
- failure = ExceptionSupport.GetException(err, "Msg {0} released", msgId);
- }
- if (failure != null && !isProducerClosed)
- {
- thisPtr.OnException(failure);
- }
-
- }
-
- protected void DoAMQPSendSync(Amqp.Message amqpMessage, TimeSpan timeout)
- {
- try
- {
- this.link.Send(amqpMessage, timeout);
- MsgsSentOnLink++;
- }
- catch (TimeoutException tex)
- {
- throw ExceptionSupport.GetTimeoutException(this.link, tex.Message);
- }
- catch (AmqpException amqpEx)
- {
- string messageId = MessageSupport.CreateNMSMessageId(amqpMessage.Properties.GetMessageId());
- throw ExceptionSupport.Wrap(amqpEx, "Failure to send message {1} on Producer {0}", this.Id, messageId);
- }
- catch (Exception ex)
- {
- Tracer.ErrorFormat("Encountered Error on sending message from Producer {0}. Message: {1}. Stack : {2}.", Id, ex.Message, ex.StackTrace);
- throw ExceptionSupport.Wrap(ex);
- }
- }
-
- #endregion
-
-
- }
-
- #region Producer Info Class
-
- internal class ProducerInfo : LinkInfo
- {
... 27676 lines suppressed ...