You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/07/17 09:09:46 UTC

[activemq-nms-amqp] branch master updated: AMQNET-589: Failover implementation

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/master by this push:
     new e9d8fd2  AMQNET-589: Failover implementation
     new 2d565c7  Merge pull request #4 from cityindex/failover_implementation
e9d8fd2 is described below

commit e9d8fd2264d22a6531fe757e7f6f0fa68190cd71
Author: Havret <h4...@gmail.com>
AuthorDate: Sat Jun 1 09:21:41 2019 +0200

    AMQNET-589: Failover implementation
---
 .gitignore                                         |    1 +
 apache-nms-amqp.sln                                |    6 +-
 src/HelloWorld/HelloWorld.cs                       |   35 +-
 src/HelloWorld/HelloWorld.csproj                   |    1 +
 src/NMS.AMQP/Apache-NMS-AMQP.csproj                |    2 +
 src/NMS.AMQP/Connection.cs                         | 1160 -----------------
 src/NMS.AMQP/ConnectionFactory.cs                  |  439 +------
 src/NMS.AMQP/ConnectionMetaData.cs                 |    2 +-
 src/NMS.AMQP/Destination.cs                        |  341 -----
 ...esMessageCloak.cs => INmsConnectionListener.cs} |   25 +-
 src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs |  217 ----
 src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs   |  163 ---
 src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs    |  216 ----
 src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs      |  650 ----------
 .../Message/AMQP/AMQPMessageTransformation.cs      |   79 --
 .../Message/AMQP/AMQPObjectMessageCloak.cs         |  413 ------
 .../Message/AMQP/AMQPStreamMessageCloak.cs         |  211 ---
 src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs  |  141 --
 .../{Cloak/ITextMessageCloak.cs => AckType.cs}     |   22 +-
 .../INmsBytesMessageFacade.cs}                     |   22 +-
 .../INmsMapMessageFacade.cs}                       |   17 +-
 .../INmsMessageFacade.cs}                          |   58 +-
 .../INmsObjectMessageFacade.cs}                    |   16 +-
 .../INmsStreamMessageFacade.cs}                    |   24 +-
 .../INmsTextMessageFacade.cs}                      |   12 +-
 src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs |  137 --
 src/NMS.AMQP/Message/Factory/IMessageFactory.cs    |   75 --
 src/NMS.AMQP/Message/Factory/MessageFactory.cs     |   92 --
 ...StreamMessageCloak.cs => INmsMessageFactory.cs} |   34 +-
 ...esMessageCloak.cs => InboundMessageDispatch.cs} |   26 +-
 src/NMS.AMQP/Message/MapMessage.cs                 |   72 --
 src/NMS.AMQP/Message/Message.cs                    |  287 -----
 .../{BytesMessage.cs => NmsBytesMessage.cs}        |  469 +++----
 .../AtomicSequence.cs => Message/NmsMapMessage.cs} |   48 +-
 src/NMS.AMQP/Message/NmsMessage.cs                 |  233 ++++
 src/NMS.AMQP/Message/NmsMessageTransformation.cs   |  158 +++
 .../{ObjectMessage.cs => NmsObjectMessage.cs}      |   56 +-
 src/NMS.AMQP/Message/NmsStreamMessage.cs           |  469 +++++++
 .../NmsTextMessage.cs}                             |   44 +-
 ...pMessageCloak.cs => OutboundMessageDispatch.cs} |   22 +-
 src/NMS.AMQP/Message/StreamMessage.cs              |  418 ------
 src/NMS.AMQP/Message/TextMessage.cs                |   67 -
 src/NMS.AMQP/MessageConsumer.cs                    |  977 --------------
 src/NMS.AMQP/MessageLink.cs                        |  506 --------
 src/NMS.AMQP/MessageProducer.cs                    |  555 --------
 src/NMS.AMQP/Meta/ConnectionInfo.cs                |  134 ++
 src/NMS.AMQP/Meta/ConsumerInfo.cs                  |   52 +
 src/NMS.AMQP/Meta/LinkInfo.cs                      |   61 +
 src/NMS.AMQP/Meta/ProducerInfo.cs                  |   74 ++
 .../ResourceInfo.cs}                               |   27 +-
 src/NMS.AMQP/Meta/SessionInfo.cs                   |   66 +
 src/NMS.AMQP/NMSConnectionFactory.cs               |   40 -
 src/NMS.AMQP/NMSResource.cs                        |  158 ---
 src/NMS.AMQP/NmsAcknowledgeCallback.cs             |   58 +
 src/NMS.AMQP/NmsConnection.cs                      |  563 ++++++++
 src/NMS.AMQP/NmsConnectionFactory.cs               |  249 ++++
 src/NMS.AMQP/NmsMessageConsumer.cs                 |  469 +++++++
 src/NMS.AMQP/NmsMessageProducer.cs                 |  256 ++++
 .../Cloak/IStreamMessageCloak.cs => NmsQueue.cs}   |   36 +-
 src/NMS.AMQP/NmsSession.cs                         |  525 ++++++++
 ...tomicSequence.cs => NmsTemporaryDestination.cs} |   49 +-
 ...derTransportContext.cs => NmsTemporaryQueue.cs} |   37 +-
 ...derTransportContext.cs => NmsTemporaryTopic.cs} |   43 +-
 .../Cloak/IBytesMessageCloak.cs => NmsTopic.cs}    |   29 +-
 src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs       |  199 +++
 .../Provider/Amqp/AmqpConnectionSession.cs         |   64 +
 src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs         |  310 +++++
 src/NMS.AMQP/Provider/Amqp/AmqpMessageIdHelper.cs  |  309 +++++
 src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs         |  193 +++
 src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs         |  214 +++
 .../Amqp/AmqpProviderFactory.cs}                   |   47 +-
 src/NMS.AMQP/Provider/Amqp/AmqpSession.cs          |  149 +++
 .../Provider/Amqp/AmqpTemporaryDestination.cs      |  114 ++
 src/NMS.AMQP/Provider/Amqp/Message/AmqpCodec.cs    |  140 ++
 .../Provider/Amqp/Message/AmqpMessageFactory.cs    |   94 ++
 .../Amqp/Message/AmqpNmsBytesMessageFacade.cs      |  195 +++
 .../Amqp/Message/AmqpNmsMapMessageFacade.cs        |   92 ++
 .../Provider/Amqp/Message/AmqpNmsMessageFacade.cs  |  479 +++++++
 .../Amqp/Message/AmqpNmsObjectMessageFacade.cs     |   95 ++
 .../Amqp/Message/AmqpNmsStreamMessageFacade.cs     |  161 +++
 .../Amqp/Message/AmqpNmsTextMessageFacade.cs       |  106 ++
 .../Amqp/Message/AmqpSerializedObjectDelegate.cs   |  105 ++
 .../Amqp/Message/AmqpTypedObjectDelegate.cs        |  107 ++
 .../Amqp/Message/IAmqpObjectTypeDelegate.cs}       |   28 +-
 src/NMS.AMQP/Provider/Failover/FailoverProvider.cs |  673 ++++++++++
 .../Failover/FailoverProviderFactory.cs}           |   44 +-
 src/NMS.AMQP/Provider/Failover/FailoverRequest.cs  |  136 ++
 .../Failover/FailoverUriPool.cs}                   |   76 +-
 .../IMessageQueue.cs => Provider/IProvider.cs}     |   63 +-
 .../IProviderFactory.cs}                           |   14 +-
 src/NMS.AMQP/Provider/IProviderListener.cs         |   74 ++
 src/NMS.AMQP/Provider/ProviderFactory.cs           |   49 +
 src/NMS.AMQP/Queue.cs                              |  140 --
 src/NMS.AMQP/RemoveSubscriptionLink.cs             |  165 ---
 src/NMS.AMQP/Session.cs                            |  920 -------------
 src/NMS.AMQP/SessionDispatcher.cs                  |   65 +
 src/NMS.AMQP/TemporaryLink.cs                      |  130 --
 .../ITextMessageCloak.cs => TerminusDurability.cs} |   18 +-
 src/NMS.AMQP/Topic.cs                              |  138 --
 ...nsportContext.cs => ISecureTransportContext.cs} |    8 +-
 ...derTransportContext.cs => ITransportContext.cs} |   24 +-
 .../{Secure/AMQP => }/SecureTransportContext.cs    |   42 +-
 .../Transport/{AMQP => }/TransportContext.cs       |   91 +-
 ...nsportContext.cs => TransportContextFactory.cs} |   42 +-
 src/NMS.AMQP/Util/AmqpDestinationHelper.cs         |  240 ++++
 .../Util/{AtomicSequence.cs => AtomicBool.cs}      |   43 +-
 src/NMS.AMQP/Util/AtomicSequence.cs                |    2 +-
 src/NMS.AMQP/Util/DispatchExecutor.cs              |  733 -----------
 src/NMS.AMQP/Util/IdGenerator.cs                   |    4 +-
 src/NMS.AMQP/Util/LinkCache.cs                     |  190 ---
 src/NMS.AMQP/Util/MessageSupport.cs                |  321 +----
 src/NMS.AMQP/Util/PriorityMessageQueue.cs          |  141 ++
 src/NMS.AMQP/Util/PropertyUtil.cs                  |  490 +------
 src/NMS.AMQP/Util/SymbolUtil.cs                    |    4 +-
 src/NMS.AMQP/Util/TaskUtil.cs                      |   81 --
 .../Util/Types/Map/AMQP/AMQPPrimitiveMap.cs        |    5 +-
 src/NMS.AMQP/Util/Types/Map/PrimitiveMapBase.cs    |   78 +-
 src/NMS.AMQP/Util/Types/Queue/FIFOMessageQueue.cs  |  103 --
 src/NMS.AMQP/Util/Types/Queue/MessageQueueBase.cs  |  251 ----
 .../Util/Types/Queue/PriorityMessageQueue.cs       |  173 ---
 .../IBytesMessageCloak.cs => Util/URISupport.cs}   |   27 +-
 src/NMS.AMQP/Util/UriUtil.cs                       |   71 -
 src/StructuredMessage/StructuredMessage.cs         |   35 +-
 src/StructuredMessage/StructuredMessage.csproj     |    1 +
 .../Apache-NMS-AMQP-Test.csproj                    |   18 +-
 test/Apache-NMS-AMQP-Test/AtomicBoolTest.cs        |   70 +
 test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs |  110 ++
 .../AmqpAcknowledgmentsIntegrationTest.cs          |  288 +++++
 .../Integration/ConnectionIntegrationTest.cs       |  248 ++++
 .../Integration/ConsumerIntegrationTest.cs         |  873 +++++++++++++
 .../Integration/FailoverIntegrationTest.cs         |  749 +++++++++++
 .../Integration/ProducerIntegrationTest.cs         |  638 +++++++++
 .../Integration/SessionIntegrationTest.cs          |  160 +++
 .../Integration/SubscriptionsIntegrationTest.cs    |   83 ++
 .../Integration/TemporaryQueueIntegrationTest.cs   |  114 ++
 .../Integration/TemporaryTopicIntegrationTest.cs   |  113 ++
 .../Message/Facade/NmsTestBytesMessageFacade.cs    |   89 ++
 .../Message/Facade/NmsTestMapMessageFacade.cs      |   18 +-
 .../Message/Facade/NmsTestMessageFacade.cs         |   83 ++
 .../Message/Facade/NmsTestObjectMessageFacade.cs   |   28 +-
 .../Message/Facade/NmsTestStreamMessageFacade.cs   |   61 +
 .../Message/Facade/NmsTestTextMessageFacade.cs     |   35 +-
 .../Message/Facade/TestMessageFactory.cs           |   75 ++
 .../Message/Foreign/ForeignNmsBytesMessage.cs      |  155 +++
 .../Message/Foreign/ForeignNmsMapMessage.cs        |   21 +-
 .../Message/Foreign/ForeignNmsMessage.cs           |  106 ++
 .../Message/Foreign/ForeignNmsObjectMessage.cs     |   26 +-
 .../Message/Foreign/ForeignNmsStreamMessage.cs     |  148 +++
 .../Message/Foreign/ForeignNmsTextMessage.cs       |   26 +-
 .../Message/NmsBytesMessageTest.cs                 |  538 ++++++++
 .../Message/NmsMapMessageTest.cs                   |   54 +
 .../Apache-NMS-AMQP-Test/Message/NmsMessageTest.cs |  442 +++++++
 .../Message/NmsMessageTransformationTest.cs        |  315 +++++
 .../Message/NmsObjectMessageTest.cs                |  138 ++
 .../Message/NmsStreamMessageTest.cs                | 1019 +++++++++++++++
 .../Message/NmsTextMessageTest.cs                  |  143 ++
 test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs     |  148 +++
 .../Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs |  134 ++
 .../Apache-NMS-AMQP-Test/NmsQueueTest.cs           |   46 +-
 .../Apache-NMS-AMQP-Test/NmsTopicTest.cs           |   46 +-
 .../PriorityMessageQueueTest.cs                    |  221 ++++
 test/Apache-NMS-AMQP-Test/PropertyUtilTest.cs      |   78 ++
 .../Provider/Amqp/AmqpCodecTest.cs                 |  585 +++++++++
 .../Provider/Amqp/AmqpMessageFactoryTest.cs        |  175 +++
 .../Provider/Amqp/AmqpMessageIdHelperTest.cs       |   61 +
 .../Provider/Amqp/AmqpNmsBytesMessageFacadeTest.cs |  536 ++++++++
 .../Provider/Amqp/AmqpNmsMapMessageFacadeTest.cs   |  232 ++++
 .../Provider/Amqp/AmqpNmsMessageFacadeTest.cs      | 1361 ++++++++++++++++++++
 .../Provider/Amqp/AmqpNmsMessageTypesTestCase.cs   |  145 +++
 .../Amqp/AmqpNmsObjectMessageFacadeTest.cs         |  372 ++++++
 .../Amqp/AmqpNmsStreamMessageFacadeTest.cs         |  297 +++++
 .../Provider/Amqp/AmqpNmsTextMessageFacadeTest.cs  |  242 ++++
 .../Provider/Amqp/AmqpProviderFactoryTest.cs       |   58 +
 .../Provider/Amqp/AmqpProviderTest.cs              |   46 +-
 .../Provider/FailoverProviderFactoryTest.cs        |   88 ++
 .../Provider/FailoverProviderTest.cs               |  295 +++++
 .../Provider/FailoverUriPoolTest.cs                |   78 ++
 .../Provider/Mock/MockProvider.cs                  |  129 ++
 .../Provider/Mock/MockProviderConfiguration.cs     |   16 +-
 .../Provider/Mock/MockProviderFactory.cs           |   48 +-
 .../Provider/Mock/MockProviderStats.cs             |   97 ++
 .../Provider/Mock/MockRemotePeer.cs                |   29 +-
 .../Test/Attribute/ConnectionSetup.cs              |    0
 .../Test/Attribute/ConsumerSetup.cs                |    0
 .../Test/Attribute/DestinationSetup.cs             |    0
 .../Test/Attribute/ProducerSetup.cs                |    0
 .../Test/Attribute/SessionSetup.cs                 |    0
 .../Test/Attribute/TestSetup.cs                    |    0
 .../Test/TestCase/BaseTestCase.cs                  |    0
 .../Test/TestCase/ConnectionTest.cs                |    0
 .../Test/TestCase/ConsumerTest.cs                  |    0
 .../Test/TestCase/MessageTest.cs                   |    0
 .../Test/TestCase/ProducerTest.cs                  |    0
 .../Test/TestCase/SecureConnectionTest.cs          |    6 +-
 .../Test/TestCase/SessionTest.cs                   |    0
 .../Test/Util/NMSLogger.cs                         |    0
 .../Test/Util/TestConfig.cs                        |    0
 .../TestAmqp/MockLinkEndpoint.cs                   |   51 +
 .../TestAmqp/MockLinkProcessor.cs                  |   31 +-
 .../TestAmqp/TerminusDurability.cs                 |   16 +-
 test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs |   89 ++
 .../TestAmqp/TestLinkEndpoint.cs                   |   38 +-
 .../TestAmqp/TestLinkProcessor.cs                  |   73 ++
 test/Apache-NMS-AMQP-Test/TestAmqp/TestListener.cs |  341 +++++
 .../TestAmqp/TestMessageProcessor.cs               |   29 +-
 .../TestAmqp/TestMessageSource.cs                  |   88 ++
 .../TestAmqp/TestRequestProcessor.cs               |   20 +-
 test/{ => Apache-NMS-AMQP-Test}/TestSuite.config   |    0
 .../Transport/TransportContextFactoryTest.cs       |   86 ++
 test/Apache-NMS-AMQP-Test/URISupportTest.cs        |   61 +
 .../config/Adapter.runsettings                     |    0
 .../config/cert/ReadMe.md                          |    0
 .../config/cert/broker.crt                         |    0
 .../config/cert/broker.key                         |    0
 test/{ => Apache-NMS-AMQP-Test}/config/cert/ca.crt |    0
 test/{ => Apache-NMS-AMQP-Test}/config/cert/ca.key |    0
 .../config/cert/client.crt                         |    0
 .../config/cert/client.key                         |    0
 .../config/cert/client_trust.jks                   |  Bin
 .../config/cert/nms_test_broker.jks                |  Bin
 .../config/cert/nms_test_broker.p12                |  Bin
 test/NMS-AMQP.Test.csproj.user                     |    6 -
 222 files changed, 21924 insertions(+), 12608 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7cf535c..412b85f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -259,3 +259,4 @@ ModelManifest.xml
 
 # FAKE - F# Make
 .fake/
+.idea/
diff --git a/apache-nms-amqp.sln b/apache-nms-amqp.sln
index 1da756d..3f33096 100644
--- a/apache-nms-amqp.sln
+++ b/apache-nms-amqp.sln
@@ -1,12 +1,12 @@
 Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio 15
-VisualStudioVersion = 15.0.26730.10
+# Visual Studio Version 16
+VisualStudioVersion = 16.0.28922.388
 MinimumVisualStudioVersion = 10.0.40219.1
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP", "src\NMS.AMQP\Apache-NMS-AMQP.csproj", "{0D8CF699-9702-4EC6-8719-C2968D32F09A}"
 EndProject
 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloWorld", "src\HelloWorld\HelloWorld.csproj", "{A56349BE-ED66-4E18-B5FE-E3EA069D2ADD}"
 EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP-Test", "test\Apache-NMS-AMQP-Test.csproj", "{DF402917-D85D-421C-A7E2-DC3DF371B9CB}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache-NMS-AMQP-Test", "test\Apache-NMS-AMQP-Test\Apache-NMS-AMQP-Test.csproj", "{DF402917-D85D-421C-A7E2-DC3DF371B9CB}"
 	ProjectSection(ProjectDependencies) = postProject
 		{0D8CF699-9702-4EC6-8719-C2968D32F09A} = {0D8CF699-9702-4EC6-8719-C2968D32F09A}
 	EndProjectSection
diff --git a/src/HelloWorld/HelloWorld.cs b/src/HelloWorld/HelloWorld.cs
index 2e10cec..d17a2db 100644
--- a/src/HelloWorld/HelloWorld.cs
+++ b/src/HelloWorld/HelloWorld.cs
@@ -17,6 +17,7 @@
 using System;
 using System.Collections.Specialized;
 using Apache.NMS;
+using Apache.NMS.AMQP;
 using CommandLine;
 
 namespace HelloWorld
@@ -65,20 +66,6 @@ namespace HelloWorld
             Uri providerUri = new Uri(ip);
             Console.WriteLine("scheme: {0}", providerUri.Scheme);
 
-            StringDictionary properties = new StringDictionary();
-            if (opts.username != null)
-            {
-                properties["NMS.username"] = opts.username;
-            }
-            if (opts.password != null)
-            {
-                properties["NMS.password"] = opts.password;
-            }
-            if (opts.clientId != null)
-            {
-                properties["NMS.clientId"] = opts.clientId;
-            }
-            properties["NMS.sendtimeout"] = opts.connTimeout+"";
             IConnection conn = null;
             if (opts.topic == null && opts.queue == null)
             {
@@ -87,8 +74,24 @@ namespace HelloWorld
             }
             try
             {
-                Apache.NMS.AMQP.NMSConnectionFactory providerFactory = new Apache.NMS.AMQP.NMSConnectionFactory(providerUri, properties);
-                IConnectionFactory factory = providerFactory.ConnectionFactory;
+                NmsConnectionFactory factory = new NmsConnectionFactory(ip);
+                if (opts.username != null)
+                {
+                    factory.UserName = opts.username;
+                }
+                if (opts.password != null)
+                {
+                    factory.Password = opts.password;
+                }
+                if (opts.clientId != null)
+                {
+                    factory.ClientId = opts.clientId;
+                }
+
+                if (opts.connTimeout != default)
+                {
+                    factory.SendTimeout = opts.connTimeout;
+                }
 
                 Console.WriteLine("Creating Connection...");
                 conn = factory.CreateConnection();
diff --git a/src/HelloWorld/HelloWorld.csproj b/src/HelloWorld/HelloWorld.csproj
index 22fd330..e2176fe 100644
--- a/src/HelloWorld/HelloWorld.csproj
+++ b/src/HelloWorld/HelloWorld.csproj
@@ -21,6 +21,7 @@ under the License.
     <OutputType>Exe</OutputType>
     <RootNamespace>HelloWorld</RootNamespace>
     <AssemblyName>HelloWorld</AssemblyName>
+    <LangVersion>7.3</LangVersion>
   </PropertyGroup>
 
   <ItemGroup>
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index b4fe988..a4074aa 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -22,6 +22,7 @@ with the License.  You may obtain a copy of the License at
     <TargetFrameworks>netstandard2.0</TargetFrameworks>
     <RootNamespace>Apache.NMS.AMQP</RootNamespace>
     <AssemblyName>Apache.NMS.AMQP</AssemblyName>
+    <LangVersion>7.3</LangVersion>
   </PropertyGroup>
 
   <PropertyGroup>
@@ -63,5 +64,6 @@ with the License.  You may obtain a copy of the License at
     <!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
     <PackageReference Include="AMQPNetLite.Core" Version="2.1.7" />
     <PackageReference Include="Apache.NMS" Version="1.8.0" />
+    <PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
   </ItemGroup>
 </Project>
diff --git a/src/NMS.AMQP/Connection.cs b/src/NMS.AMQP/Connection.cs
deleted file mode 100644
index c7af40f..0000000
--- a/src/NMS.AMQP/Connection.cs
+++ /dev/null
@@ -1,1160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Collections.Specialized;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Amqp;
-using Amqp.Framing;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Transport;
-using System.Reflection;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP
-{
-    using Message.Factory;
-    enum ConnectionState
-    {
-        UNKNOWN = -1,
-        INITIAL = 0,
-        CONNECTING = 1,
-        CONNECTED = 2,
-        CLOSING = 3,
-        CLOSED = 4,
-
-    }
-
-    /// <summary>
-    /// Apache.NMS.AMQP.Connection facilitates management and creates the underlying Amqp.Connection protocol engine object.
-    /// Apache.NMS.AMQP.Connection is also the NMS.AMQP.Session Factory.
-    /// </summary>
-    class Connection : NMSResource<ConnectionInfo>, Apache.NMS.IConnection
-    {
-        public static readonly string MESSAGE_OBJECT_SERIALIZATION_PROP = PropertyUtil.CreateProperty("Message.Serialization", ConnectionFactory.ConnectionPropertyPrefix);
-        
-        private IRedeliveryPolicy redeliveryPolicy;
-        private Amqp.IConnection impl;
-        private ProviderCreateConnection implCreate;
-        private ConnectionInfo connInfo;
-        private readonly IdGenerator clientIdGenerator;
-        private Atomic<bool> clientIdCanSet = new Atomic<bool>(true);
-        private Atomic<bool> closing = new Atomic<bool>(false);
-        private Atomic<ConnectionState> state = new Atomic<ConnectionState>(ConnectionState.INITIAL);
-        private CountDownLatch latch;
-        private ConcurrentDictionary<Id, Session> sessions = new ConcurrentDictionary<Id, Session>();
-        private IdGenerator sesIdGen = null;
-        private IdGenerator tempTopicIdGen = null;
-        private IdGenerator tempQueueIdGen = null;
-        private StringDictionary properties;
-        private TemporaryLinkCache temporaryLinks = null;
-        private IProviderTransportContext transportContext = null;
-        private DispatchExecutor exceptionExecutor = null;
-
-        #region Contructor
-
-        internal Connection(Uri addr, IdGenerator clientIdGenerator)
-        {
-            connInfo = new ConnectionInfo();
-            connInfo.remoteHost = addr;
-            Info = connInfo;
-            this.clientIdGenerator = clientIdGenerator;
-            latch = new CountDownLatch(1);
-            temporaryLinks = new TemporaryLinkCache(this);
-        }
-        
-        #endregion
-
-        #region Internal Properties
-
-        internal Amqp.IConnection InnerConnection { get { return this.impl; } }
-
-        internal IdGenerator SessionIdGenerator
-        {
-            get
-            {
-                IdGenerator sig = sesIdGen;
-                lock (this)
-                {
-                    if (sig == null)
-                    {
-                        sig = new NestedIdGenerator("ID:ses", connInfo.Id, true);
-                        sesIdGen = sig;
-                    }
-                }
-                return sig;
-            }
-        }
-
-        internal IdGenerator TemporaryTopicGenerator
-        {
-            get
-            {
-                IdGenerator ttg = tempTopicIdGen;
-                lock (this)
-                {
-                    if (ttg == null)
-                    {
-                        ttg = new NestedIdGenerator("ID:nms-temp-topic", Info.Id, true);
-                        tempTopicIdGen = ttg;
-                    }
-                }
-                return ttg;
-            }
-        }
-
-        internal IdGenerator TemporaryQueueGenerator
-        {
-            get
-            {
-                IdGenerator tqg = tempQueueIdGen;
-                lock (this)
-                {
-                    if (tqg == null)
-                    {
-                        tqg = new NestedIdGenerator("ID:nms-temp-queue", Info.Id, true);
-                        tempQueueIdGen = tqg;
-                    }
-                }
-                return tqg;
-            }
-        }
-
-        internal bool IsConnected
-        {
-            get
-            {
-                return this.state.Value.Equals(ConnectionState.CONNECTED);
-            }
-        }
-
-        internal bool IsClosed
-        {
-            get
-            {
-                return this.state.Value.Equals(ConnectionState.CLOSED);
-            }
-        }
-
-        internal ushort MaxChannel
-        {
-            get { return connInfo.channelMax; }
-        }
-
-        internal MessageTransformation TransformFactory
-        {
-            get
-            {
-                return MessageFactory<ConnectionInfo>.Instance(this).GetTransformFactory();
-            }
-        }
-
-        internal IMessageFactory MessageFactory
-        {
-            get
-            {
-                return MessageFactory<ConnectionInfo>.Instance(this);
-            }
-        }
-
-        internal string TopicPrefix
-        {
-            get { return connInfo.TopicPrefix; }
-        }
-
-        internal string QueuePrefix
-        {
-            get { return connInfo.QueuePrefix; }
-        }
-
-        internal bool IsAnonymousRelay
-        {
-            get { return connInfo.IsAnonymousRelay; }
-        }
-
-        internal bool IsDelayedDelivery
-        {
-            get { return connInfo.IsDelayedDelivery; }
-        }
-        
-        #endregion
-
-        #region Internal Methods
-
-        internal ITemporaryTopic CreateTemporaryTopic()
-        {
-            TemporaryTopic temporaryTopic = new TemporaryTopic(this);
-
-            CreateTemporaryLink(temporaryTopic);
-
-            return temporaryTopic;
-        }
-
-        internal ITemporaryQueue CreateTemporaryQueue()
-        {
-            TemporaryQueue temporaryQueue = new TemporaryQueue(this);
-
-            CreateTemporaryLink(temporaryQueue);
-
-            return temporaryQueue;
-        }
-
-        private void CreateTemporaryLink(TemporaryDestination temporaryDestination)
-        {
-            TemporaryLink link = new TemporaryLink(temporaryLinks.Session, temporaryDestination);
-
-            link.Attach();
-
-            temporaryLinks.AddLink(temporaryDestination, link);
-
-        }
-
-        /// <summary>
-        /// Unsubscribes Durable Consumers on the connection
-        /// </summary>
-        /// <param name="name">The subscription name.</param>
-        internal void Unsubscribe(string name)
-        {
-            // check for any active consumers on the subscription name.
-            foreach (Session session in GetSessions())
-            {
-                if (session.ContainsSubscriptionName(name))
-                {
-                    throw new IllegalStateException("Cannot unsubscribe from Durable Consumer while consuming messages.");
-                }
-            }
-            // unsubscribe using an instance of RemoveSubscriptionLink.
-            RemoveSubscriptionLink removeLink = new RemoveSubscriptionLink(this.temporaryLinks.Session, name);
-            removeLink.Unsubscribe();
-        }
-
-        internal bool ContainsSubscriptionName(string name)
-        {
-            foreach (Session session in GetSessions())
-            {
-                if (session.ContainsSubscriptionName(name))
-                {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        internal void Configure(ConnectionFactory cf)
-        {
-            Amqp.ConnectionFactory cfImpl = cf.Factory as Amqp.ConnectionFactory;
-            
-            // get properties from connection factory
-            StringDictionary properties = cf.ConnectionProperties;
-
-            // apply connection properties to connection factory and connection info.
-            PropertyUtil.SetProperties(cfImpl.AMQP, properties, ConnectionFactory.ConnectionPropertyPrefix);
-            PropertyUtil.SetProperties(connInfo, properties, ConnectionFactory.ConnectionPropertyPrefix);
-            
-            // create copy of transport context
-            this.transportContext = cf.Context.Copy();
-
-            // Store raw properties for future objects
-            this.properties = PropertyUtil.Clone(properties);
-            
-            // Create Connection builder delegate.
-            this.implCreate = this.transportContext.CreateConnectionBuilder();
-        }
-
-        internal StringDictionary Properties
-        {
-            get { return PropertyUtil.Merge(this.properties, PropertyUtil.GetProperties(this.connInfo)); }
-        }
-
-        internal void Remove(TemporaryDestination destination)
-        {
-            temporaryLinks.RemoveLink(destination);
-        }
-
-        internal void DestroyTemporaryDestination(TemporaryDestination destination)
-        {
-            ThrowIfClosed();
-            foreach(Session session in GetSessions())
-            {
-                if (session.IsDestinationInUse(destination))
-                {
-                    throw new IllegalStateException("Cannot delete Temporary Destination, {0}, while consuming messages.");
-                }
-            }
-            try
-            {
-                TemporaryLink link = temporaryLinks.RemoveLink(destination);
-                if(link != null && !link.IsClosed)
-                {
-                    link.Close();
-                }
-            }
-            catch (Exception e)
-            {
-                throw ExceptionSupport.Wrap(e);
-            }
-        }
-
-        internal void Remove(Session ses)
-        {
-            Session result = null;
-            if(!sessions.TryRemove(ses.Id, out result))
-            {
-                Tracer.WarnFormat("Could not disassociate Session {0} with Connection {1}.", ses.Id, ClientId);
-            }
-        }
-
-        private Session[] GetSessions()
-        {   
-            return sessions.Values.ToArray();
-        }
-
-        private void CheckIfClosed()
-        {
-            if (this.state.Value.Equals(ConnectionState.CLOSED))
-            {
-                throw new IllegalStateException("Operation invalid on closed connection.");
-            }
-        }
-
-        private void ProcessCapabilities(Open openResponse)
-        {
-            if(openResponse.OfferedCapabilities != null || openResponse.OfferedCapabilities.Length > 0)
-            {
-                foreach(Amqp.Types.Symbol symbol in openResponse.OfferedCapabilities)
-                {
-                    if (SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY.Equals(symbol))
-                    {
-                        connInfo.IsAnonymousRelay = true;
-                    }
-                    else if (SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY.Equals(symbol))
-                    {
-                        connInfo.IsDelayedDelivery = true;
-                    }
-                    else
-                    {
-                        connInfo.AddCapability(symbol);
-                    }
-                }
-            }
-        }
-
-        private void ProcessRemoteConnectionProperties(Open openResponse)
-        {
-            if (openResponse.Properties != null && openResponse.Properties.Count > 0)
-            {
-                foreach(object key in openResponse.Properties.Keys)
-                {
-                    string keyString = key.ToString();
-                    string valueString = openResponse.Properties[key]?.ToString();
-                    this.connInfo.RemotePeerProperies.Add(keyString, valueString);
-                }
-            }
-        }
-
-        private void OpenResponse(Amqp.IConnection conn, Open openResp)
-        {
-            Tracer.InfoFormat("Connection {0}, Open {0}", conn.ToString(), openResp.ToString());
-            Tracer.DebugFormat("Open Response : \n Hostname = {0},\n ContainerId = {1},\n MaxChannel = {2},\n MaxFrame = {3}\n", openResp.HostName, openResp.ContainerId, openResp.ChannelMax, openResp.MaxFrameSize);
-            Tracer.DebugFormat("Open Response Descriptor : \n Descriptor Name = {0},\n Descriptor Code = {1}\n", openResp.Descriptor.Name, openResp.Descriptor.Code);
-            ProcessCapabilities(openResp);
-            ProcessRemoteConnectionProperties(openResp);
-            if (SymbolUtil.CheckAndCompareFields(openResp.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
-            {
-                Tracer.InfoFormat("Open response contains {0} property the connection {1} will soon be closed.", SymbolUtil.CONNECTION_ESTABLISH_FAILED, this.ClientId);
-            }
-            else
-            {
-                object value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
-                if(value != null && value is string)
-                {
-                    this.connInfo.TopicPrefix = value as string;
-                }
-                value = SymbolUtil.GetFromFields(openResp.Properties, SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
-                if (value != null && value is string)
-                {
-                    this.connInfo.QueuePrefix = value as string;
-                }
-                this.latch?.countDown();
-            }
-        }
-
-        private Open CreateOpenFrame(ConnectionInfo connInfo)
-        {
-            Open frame = new Open();
-            frame.ContainerId = connInfo.clientId;
-            frame.ChannelMax = connInfo.channelMax;
-            frame.MaxFrameSize = Convert.ToUInt32(connInfo.maxFrameSize);
-            frame.HostName = connInfo.remoteHost.Host;
-            frame.IdleTimeOut = Convert.ToUInt32(connInfo.idleTimout);
-            frame.DesiredCapabilities = new Amqp.Types.Symbol[] {
-                SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
-                SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
-                SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY
-            };
-
-            return frame;
-        }
-
-        internal void Connect()
-        {
-            if (this.state.CompareAndSet(ConnectionState.INITIAL, ConnectionState.CONNECTING))
-            {
-                Address addr = UriUtil.ToAddress(connInfo.remoteHost, connInfo.username, connInfo.password ?? string.Empty);
-                Tracer.InfoFormat("Creating Address: {0}", addr.Host);
-                if (this.clientIdCanSet.CompareAndSet(true, false))
-                {
-                    if (this.ClientId == null)
-                    {
-                        connInfo.ResourceId = this.clientIdGenerator.GenerateId();
-                    }
-                    else
-                    {
-                        connInfo.ResourceId = new Id(ClientId);
-                    }
-                    Tracer.InfoFormat("Staring Connection with Client Id : {0}", this.ClientId);
-                }
-                
-                Open openFrame = CreateOpenFrame(this.connInfo);
-
-                this.latch = new CountDownLatch(1);
-
-                Task<Amqp.Connection> fconn = this.implCreate(addr, openFrame, this.OpenResponse);
-                // wait until the Open request is sent
-                this.impl = TaskUtil.Wait(fconn, connInfo.connectTimeout);
-                if(fconn.Exception != null)
-                {
-                    // exceptions thrown from TaskUtil are typically System.AggregateException and are usually transport exceptions for secure transport.
-                    // extract the innerException of interest and wrap it as an NMS exception
-                    if (fconn.Exception is AggregateException)
-                    {
-                        throw ExceptionSupport.Wrap(fconn.Exception.InnerException,
-                               "Failed to connect host {0}. Cause: {1}", openFrame.HostName, fconn.Exception.InnerException?.Message ?? fconn.Exception.Message);
-                    }
-                    else {
-                        throw ExceptionSupport.Wrap(fconn.Exception, 
-                               "Failed to connect host {0}. Cause: {1}", openFrame.HostName, fconn.Exception?.Message);
-                    }
-                }
-
-                this.impl.Closed += OnInternalClosed;
-                this.impl.AddClosedCallback(OnInternalClosed);
-
-                ConnectionState finishedState = ConnectionState.UNKNOWN;
-                // Wait for Open response 
-                try
-                {
-                    bool received = this.latch.await((this.Info.requestTimeout==0) ? Timeout.InfiniteTimeSpan : this.RequestTimeout);
-                    if (received && this.impl.Error == null && fconn.Exception == null)
-                    {
-
-                        Tracer.InfoFormat("Connection {0} has connected.", this.impl.ToString());
-                        finishedState = ConnectionState.CONNECTED;
-                        // register connection factory once client Id accepted.
-                        MessageFactory<ConnectionInfo>.Register(this);
-                    }
-                    else
-                    {
-                        if (!received)
-                        {
-                            // Timeout occured waiting on response
-                            Tracer.InfoFormat("Connection Response Timeout. Failed to receive response from {0} in {1}ms", addr.Host, this.Info.requestTimeout);
-                        }
-                        finishedState = ConnectionState.INITIAL;
-                        
-                        if (fconn.Exception == null)
-                        {
-                            if (!received) throw ExceptionSupport.GetTimeoutException(this.impl, "Connection {0} has failed to connect in {1}ms.", ClientId, connInfo.closeTimeout);
-                            Tracer.ErrorFormat("Connection {0} has Failed to connect. Message: {1}", ClientId, (this.impl.Error == null ? "Unknown" : this.impl.Error.ToString()));
-
-                            throw ExceptionSupport.GetException(this.impl, "Connection {0} has failed to connect.", ClientId);
-                        }
-                        else
-                        {
-                            throw ExceptionSupport.Wrap(fconn.Exception, "Connection {0} failed to connect.", ClientId);
-                        }
-
-                    }
-                }
-                finally
-                {
-                    this.latch = null;
-                    this.state.GetAndSet(finishedState);
-                    if (finishedState != ConnectionState.CONNECTED)
-                    {
-                        this.impl.Close(TimeSpan.FromMilliseconds(connInfo.closeTimeout),null);
-                    }
-                }
-            }
-        }
-
-        private void Shutdown()
-        {
-            foreach(Session s in GetSessions())
-            {
-                s.Shutdown();
-            }
-            // signals to the DispatchExecutor to stop enqueue exception notifications
-            // and drain off remaining notifications.
-            this.exceptionExecutor?.Shutdown();
-        }
-
-        private void OnInternalClosed(IAmqpObject sender, Error error)
-        {
-            string name = null;
-            Connection self = null;
-            try
-            {
-                self = this;
-                // name should throw should the finalizer of the Connection object already completed.
-                name = self.ClientId;
-                Tracer.InfoFormat("Received Close Request for Connection {0}.", name);
-                if (self.state.CompareAndSet(ConnectionState.CONNECTED, ConnectionState.CLOSED))
-                {
-                    // unexpected or amqp transport close.
-
-                    // notify any error to exception Dispatcher.
-                    if (error != null)
-                    {
-                        NMSException nmse = ExceptionSupport.GetException(error, "Connection {0} Closed.", name);
-                        self.OnException(nmse);
-                    }
-
-                    // shutdown connection facilities.
-                    if (self.IsStarted)
-                    {
-                        self.Shutdown();
-                    }
-                    MessageFactory<ConnectionInfo>.Unregister(self);
-                }
-                else if (self.state.CompareAndSet(ConnectionState.CLOSING, ConnectionState.CLOSED))
-                {
-                    // application close.
-                    MessageFactory<ConnectionInfo>.Unregister(self);
-                }
-                self.latch?.countDown();
-            }
-            catch (Exception ex)
-            {
-                Tracer.DebugFormat("Caught Exception during Amqp Connection close for NMS Connection{0}. Exception {1}",
-                    name != null ? (" " + name) : "", ex);
-            }
-        }
-
-        private void Disconnect()
-        {
-            if(this.state.CompareAndSet(ConnectionState.CONNECTED, ConnectionState.CLOSING) && this.impl!=null)
-            {
-                Tracer.InfoFormat("Sending Close Request On Connection {0}.", ClientId);
-                try
-                {
-                    if (!this.impl.IsClosed)
-                    {
-                        this.impl.Close(TimeSpan.FromMilliseconds(connInfo.closeTimeout), null);
-                    }
-                }
-                catch (AmqpException amqpEx)
-                {
-                    throw ExceptionSupport.Wrap(amqpEx, "Error Closing Amqp Connection " + ClientId);
-                }
-                catch (TimeoutException tmoutEx)
-                {
-                    throw ExceptionSupport.GetTimeoutException(this.impl, "Timeout waiting for Amqp Connection {0} Close response. Message : {1}", ClientId, tmoutEx.Message);
-                }
-                finally
-                {
-                    if (this.state.CompareAndSet(ConnectionState.CLOSING, ConnectionState.CLOSED))
-                    {
-                        // connection cleanup.
-                        MessageFactory<ConnectionInfo>.Unregister(this);
-                        this.impl = null;
-                    }
-                    
-                }
-            }
-        }
-        
-        protected DispatchExecutor ExceptionExecutor
-        {
-            get
-            {
-                if(exceptionExecutor == null && !IsClosed)
-                {
-                    exceptionExecutor = new DispatchExecutor(true);
-                    exceptionExecutor.Start();
-                }
-                return exceptionExecutor;
-            }
-        }
-
-        internal void OnException(Exception ex)
-        {
-            Apache.NMS.ExceptionListener listener = this.ExceptionListener;
-            if(listener != null)
-            {
-                ExceptionNotification en = new ExceptionNotification(this, ex);
-                this.ExceptionExecutor.Enqueue(en);
-            }
-        }
-
-        protected void DispatchException(Exception ex)
-        {
-            Apache.NMS.ExceptionListener listener = this.ExceptionListener;
-            if (listener != null)
-            {
-                // Wrap does nothing if this is already a NMS exception, otherwise
-                // wrap it appropriately.
-                listener(ExceptionSupport.Wrap(ex));
-            }
-            else
-            {
-                Tracer.WarnFormat("Received Async exception. Type {0} Message {1}", ex.GetType().Name, ex.Message);
-                Tracer.DebugFormat("Async Exception Stack {0}", ex);
-            }
-        }
-
-        private class ExceptionNotification : DispatchEvent
-        {
-            private readonly Connection connection;
-            private readonly Exception exception;
-            public ExceptionNotification(Connection owner, Exception ex)
-            {
-                connection = owner;
-                exception = ex;
-                Callback = this.Nofify;
-            }
-
-            public override void OnFailure(Exception e)
-            {
-                base.OnFailure(e);
-                connection.DispatchException(e);
-            }
-
-            private void Nofify()
-            {
-                connection.DispatchException(exception);
-            }
-
-        }
-        
-        #endregion
-
-        #region IConnection methods
-
-        AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-        /// <summary>
-        /// Sets the <see cref="Apache.NMS.AcknowledgementMode"/> for the <see cref="Apache.NMS.ISession"/> 
-        /// objects created by the connection.
-        /// </summary>
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return acknowledgementMode; }
-            set { acknowledgementMode = value; }
-        }
-        
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.ClientId"/>.
-        /// </summary>
-        public string ClientId
-        {
-            get { return connInfo.clientId; }
-            set
-            {
-                if (this.clientIdCanSet.Value)
-                {
-                    if (value != null && value.Length > 0)
-                    {
-                        connInfo.clientId = value;
-                        try
-                        {
-                            this.Connect();
-                        }
-                        catch (NMSException nms)
-                        {
-                            NMSException ex = nms;
-                            if (nms.Message.Contains("invalid-field:container-id"))
-                            {
-                                ex = new InvalidClientIDException(nms.Message);
-                            }
-                            throw ex;
-                        }
-                    }
-                }
-                else
-                {
-                    throw new InvalidClientIDException("Client Id can not be set after connection is Started.");
-                }
-            }
-        }
-
-        /// <summary>
-        /// Throws <see cref="NotImplementedException"/>.
-        /// </summary>
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get => throw new NotImplementedException();
-            set => throw new NotImplementedException();
-        }
-
-        /// <summary>
-        /// Throws <see cref="NotImplementedException"/>.
-        /// </summary>
-        public ProducerTransformerDelegate ProducerTransformer
-        {
-            get => throw new NotImplementedException();
-            set => throw new NotImplementedException();
-        }
-
-        /// <summary>
-        /// AMQP Provider access to remote connection properties. 
-        /// This is used by <see cref="ConnectionProviderUtilities.GetRemotePeerConnectionProperties(Apache.NMS.IConnection)"/>.
-        /// </summary>
-        public StringDictionary RemotePeerProperties
-        {
-            get { return PropertyUtil.Clone(this.Info.RemotePeerProperies); }
-        }
-
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.MetaData"/>.
-        /// </summary>
-        public IConnectionMetaData MetaData
-        {
-            get
-            {
-                return ConnectionMetaData.Version;
-            }
-        }
-
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.RedeliveryPolicy"/>.
-        /// </summary>
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get { return this.redeliveryPolicy; }
-            set
-            {
-                if (value != null)
-                {
-                    this.redeliveryPolicy = value;
-                }
-            }
-        }
-
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.RequestTimeout"/>.
-        /// </summary>
-        public TimeSpan RequestTimeout
-        {
-            get
-            {
-                return TimeSpan.FromMilliseconds(this.connInfo.requestTimeout);
-            }
-
-            set
-            {
-                connInfo.requestTimeout = Convert.ToInt64(value.TotalMilliseconds);
-            }
-        }
-
-        /// <summary>
-        /// Not Implemented, throws <see cref="NotImplementedException"/>.
-        /// </summary>
-        public event ConnectionInterruptedListener ConnectionInterruptedListener
-        {
-            add => throw new NotImplementedException("AMQP Provider does not implement ConnectionInterruptedListener events.");
-            remove => throw new NotImplementedException("AMQP Provider does not implement ConnectionInterruptedListener events.");
-        }
-
-        /// <summary>
-        /// Not Implemented, throws <see cref="NotImplementedException"/>.
-        /// </summary>
-        public event ConnectionResumedListener ConnectionResumedListener
-        {
-            add => throw new NotImplementedException("AMQP Provider does not implement ConnectionResumedListener events.");
-            remove => throw new NotImplementedException("AMQP Provider does not implement ConnectionResumedListener events.");
-        }
-
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.ExceptionListener"/>.
-        /// </summary>
-        public event ExceptionListener ExceptionListener;
-
-        /// <summary>
-        /// Creates a <see cref="Apache.NMS.ISession"/> with 
-        /// the connection <see cref="Apache.NMS.IConnection.AcknowledgementMode"/>.
-        /// </summary>
-        /// <returns>An <see cref="Apache.NMS.ISession"/> provider instance.</returns>
-        public Apache.NMS.ISession CreateSession()
-        {
-            return CreateSession(acknowledgementMode);
-        }
-
-        /// <summary>
-        /// Creates a <see cref="Apache.NMS.ISession"/> with the given <see cref="Apache.NMS.AcknowledgementMode"/> parameter.
-        /// <para>
-        /// Throws <see cref="NotImplementedException"/> for the <see cref="Apache.NMS.AcknowledgementMode.Transactional"/>.
-        /// </para>
-        /// </summary>
-        /// <param name="acknowledgementMode"></param>
-        /// <returns></returns>
-        public Apache.NMS.ISession CreateSession(AcknowledgementMode acknowledgementMode)
-        {
-            this.CheckIfClosed();
-            this.Connect();
-            Session ses = new Session(this);
-            ses.AcknowledgementMode = acknowledgementMode;
-            try
-            {
-                ses.Begin();
-            }
-            catch(NMSException) { throw; }
-            catch(Exception ex)
-            {
-                throw ExceptionSupport.Wrap(ex, "Failed to establish amqp Session.");
-            }
-
-            if(!this.sessions.TryAdd(ses.Id, ses))
-            {
-                Tracer.ErrorFormat("Failed to add Session {0}.", ses.Id);
-            }
-            Tracer.InfoFormat("Created Session {0} on connection {1}.", ses.Id, this.ClientId);
-            return ses;
-        }
-
-        /// <summary>
-        /// Destroys all temporary destinations for the connection. 
-        /// <para>
-        /// Throws <see cref="IllegalStateException"/> should any Temporary Topic or Temporary Queue in 
-        /// the connection have an active consumer using it.
-        /// </para>
-        /// </summary>
-        public void PurgeTempDestinations()
-        {
-            foreach(TemporaryDestination temp in temporaryLinks.Keys.ToArray())
-            {
-                this.DestroyTemporaryDestination(temp);
-            }
-        }
-
-        /// <summary>
-        /// See <see cref="Apache.NMS.IConnection.Close"/>.
-        /// </summary>
-        public void Close()
-        {
-            Dispose(true);
-            if (this.IsClosed)
-            {
-                GC.SuppressFinalize(this);
-            }
-        }
-
-        #endregion
-
-        #region IDisposable Methods
-
-
-        protected virtual void Dispose(bool disposing)
-        {
-            if (IsClosed) return;
-            Tracer.DebugFormat("Closing of Connection {0}", ClientId);
-            if (disposing)
-            {   
-                // full shutdown
-                if (this.closing.CompareAndSet(false, true))
-                {
-                    Session[] connectionSessions = GetSessions();
-                    foreach (Session s in connectionSessions)
-                    {
-                        try
-                        {
-                            s.CheckOnDispatchThread();
-                        }
-                        catch
-                        {
-                            this.closing.Value = false;
-                            throw;
-                        }
-                    }
-
-                    this.Stop();
-
-                    this.temporaryLinks.Close();
-
-                    try
-                    {
-                        this.Disconnect();
-                    }
-                    catch (Exception ex)
-                    {
-                        // log network errors
-                        NMSException nmse = ExceptionSupport.Wrap(ex, "Amqp Connection close failure for NMS Connection {0}", this.Id);
-                        Tracer.DebugFormat("Caught Exception while closing Amqp Connection {0}. Exception {1}", this.Id, nmse);
-                    }
-                    finally
-                    {
-                        sessions?.Clear();
-                        sessions = null;
-                        if (this.state.Value.Equals(ConnectionState.CLOSED))
-                        {
-                            if (this.exceptionExecutor != null)
-                            {
-                                this.exceptionExecutor.Close();
-                                this.exceptionExecutor = null;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        public void Dispose()
-        {
-            try
-            {
-                this.Close();
-            }
-            catch (Exception ex)
-            {
-                Tracer.DebugFormat("Caught Exception while Disposing of NMS Connection {0}. Exception {1}", this.ClientId, ex);
-            }
-        }
-
-        #endregion
-
-        #region NMSResource Methods
-
-        public override bool IsStarted { get { return !mode.Value.Equals(Resource.Mode.Stopped); } }
-
-        protected override void ThrowIfClosed()
-        {
-            this.CheckIfClosed();
-        }
-
-        protected override void StartResource()
-        {
-            this.Connect();
-
-            if (!IsConnected)
-            {
-                throw new NMSConnectionException("Connection Failed to connect to Client.");
-            }
-
-            //start sessions here
-            foreach (Session s in GetSessions())
-            {
-                s.Start();
-            }
-            
-        }
-
-        protected override void StopResource()
-        {
-            if ( this.impl != null && !this.impl.IsClosed)
-            {
-                // stop all sessions here.
-                foreach (Session s in GetSessions())
-                {
-                    s.Stop();
-                }
-            }
-        }
-
-        #endregion
-
-        public override string ToString()
-        {
-            return "Connection:\nConnection Info:"+connInfo.ToString();
-        }
-
-        
-    }
-
-    #region Connection Provider Utilities
-
-    /// <summary>
-    /// This give access to provider specific functions and capabilities for a provider connection.
-    /// </summary>
-    public static class ConnectionProviderUtilities
-    {
-        public static bool IsAMQPConnection(Apache.NMS.IConnection connection)
-        {
-            return connection != null && connection is Apache.NMS.AMQP.Connection;
-        }
-
-        public static StringDictionary GetRemotePeerConnectionProperties(Apache.NMS.IConnection connection)
-        {
-            if (connection == null)
-            {
-                return null;
-            }
-            else if (connection is Apache.NMS.AMQP.Connection)
-            {
-                return (connection as Apache.NMS.AMQP.Connection).RemotePeerProperties;
-            }
-            return null;
-        }
-    }
-
-    #endregion
-
-    #region Connection Information inner Class
-
-    internal class ConnectionInfo : ResourceInfo
-    {
-        static ConnectionInfo()
-        {
-            Amqp.ConnectionFactory defaultCF = new Amqp.ConnectionFactory();
-            AmqpSettings defaultAMQPSettings = defaultCF.AMQP;
-            
-            DEFAULT_CHANNEL_MAX = defaultAMQPSettings.MaxSessionsPerConnection;
-            DEFAULT_MAX_FRAME_SIZE = defaultAMQPSettings.MaxFrameSize;
-            DEFAULT_IDLE_TIMEOUT = defaultAMQPSettings.IdleTimeout;
-            
-            DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
-
-        }
-        public const long INFINITE = -1;
-        public const long DEFAULT_CONNECT_TIMEOUT = 15000;
-        public const int DEFAULT_CLOSE_TIMEOUT = 15000;
-        public static readonly long DEFAULT_REQUEST_TIMEOUT;
-        public static readonly long DEFAULT_IDLE_TIMEOUT;
-
-        public static readonly ushort DEFAULT_CHANNEL_MAX;
-        public static readonly int DEFAULT_MAX_FRAME_SIZE;
-
-        public ConnectionInfo() : this(null) { }
-        public ConnectionInfo(Id clientId) : base(clientId)
-        {
-            if (clientId != null)
-                this.clientId = clientId.ToString();
-        }
-
-        private Id ClientId = null;
-
-        public override Id Id
-        {
-            get
-            {
-                if (base.Id == null)
-                {
-                    if (ClientId == null && clientId != null)
-                    {
-                        ClientId = new Id(clientId);
-                    }
-                    return ClientId;
-                }
-                else
-                {
-                    return base.Id;
-                }
-            }
-        }
-
-        internal Id ResourceId
-        {
-            set
-            {
-                if(ClientId == null && value != null)
-                {
-                    ClientId = value;
-                    clientId = ClientId.ToString();
-                }
-                
-            }
-        }
-
-        internal Uri remoteHost { get; set; }
-        public string clientId { get; internal set; } = null;
-        public string username { get; set; } = null;
-        public string password { get; set; } = null;
-
-        public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
-        public long connectTimeout { get; set; } = DEFAULT_CONNECT_TIMEOUT;
-        public int closeTimeout { get; set; } = DEFAULT_CLOSE_TIMEOUT;
-        public long idleTimout { get; set; } = DEFAULT_IDLE_TIMEOUT;
-
-        public ushort channelMax { get; set; } = DEFAULT_CHANNEL_MAX;
-        public int maxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
-
-        public string TopicPrefix { get; internal set; } = null;
-
-        public string QueuePrefix { get; internal set; } = null;
-
-        public bool IsAnonymousRelay { get; internal set; } = false;
-
-        public bool IsDelayedDelivery { get; internal set; } = false;
-
-        public Message.Cloak.AMQPObjectEncodingType? EncodingType { get; internal set; } = null;
-
-
-        public IList<string> Capabilities { get { return new List<string>(capabilities); } }
-
-        public bool HasCapability(string capability)
-        {
-            return capabilities.Contains(capability);
-        }
-
-        public void AddCapability(string capability)
-        {
-            if (capability != null && capability.Length > 0)
-                capabilities.Add(capability);
-        }
-
-        public StringDictionary RemotePeerProperies { get => remoteConnectionProperties; }
-
-        private StringDictionary remoteConnectionProperties = new StringDictionary();
-        private List<string> capabilities = new List<string>();
-
-        public override string ToString()
-        {
-            string result = "";
-            result += "connInfo = [\n";
-            foreach (MemberInfo info in this.GetType().GetMembers())
-            {
-                if (info is PropertyInfo)
-                {
-                    PropertyInfo prop = info as PropertyInfo;
-
-                    if (prop.GetGetMethod(true).IsPublic)
-                    {
-                        if (prop.GetGetMethod(true).ReturnParameter.ParameterType.IsEquivalentTo(typeof(List<string>)))
-                        {
-                            result += string.Format("{0} = {1},\n", prop.Name, PropertyUtil.ToString(prop.GetValue(this,null) as IList));
-                        }
-                        else
-                        {
-                            result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
-                        }
-
-                    }
-                }
-            }
-            result = result.Substring(0, result.Length - 2) + "\n]";
-            return result;
-        }
-
-    }
-
-    #endregion
-
-}
diff --git a/src/NMS.AMQP/ConnectionFactory.cs b/src/NMS.AMQP/ConnectionFactory.cs
index ba9e9c8..4bc722d 100644
--- a/src/NMS.AMQP/ConnectionFactory.cs
+++ b/src/NMS.AMQP/ConnectionFactory.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,452 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
-using System.Collections;
-using System.Collections.Specialized;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.Policies;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Transport;
-using Apache.NMS.AMQP.Transport.AMQP;
-using Apache.NMS.AMQP.Transport.Secure;
-using Apache.NMS.AMQP.Transport.Secure.AMQP;
-using System.Security.Cryptography.X509Certificates;
-using System.Net.Security;
-using System.Security.Authentication;
-using Amqp;
 
 namespace Apache.NMS.AMQP
 {
-    internal delegate Task<Amqp.Connection> ProviderCreateConnection(Amqp.Address addr, Amqp.Framing.Open open, Amqp.OnOpened onOpened);
-    /// <summary>
-    /// Apache.NMS.AMQP.ConnectionFactory implements Apache.NMS.IConnectionFactory.
-    /// Apache.NMS.AMQP.ConnectionFactory creates, manages and configures the Amqp.ConnectionFactory used to create Amqp Connections.
-    /// </summary>
-    public class ConnectionFactory : Apache.NMS.IConnectionFactory
+    public class ConnectionFactory : NmsConnectionFactory
     {
-
-        public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
-        internal static readonly string CLIENT_ID_PROP = PropertyUtil.CreateProperty("ClientId", "", ConnectionPropertyPrefix);
-        internal static readonly string USERNAME_PROP = PropertyUtil.CreateProperty("UserName", "", ConnectionPropertyPrefix);
-        internal static readonly string PASSWORD_PROP = PropertyUtil.CreateProperty("Password", "", ConnectionPropertyPrefix);
-
-        internal const string ConnectionPropertyPrefix = "connection.";
-        internal const string ConnectionPropertyAlternativePrefix = PropertyUtil.PROPERTY_PREFIX;
-        internal const string TransportPropertyPrefix = "transport.";
-
-        private Amqp.Address amqpHost = null;
-        private Uri brokerUri;
-        private string clientId;
-        private IdGenerator clientIdGenerator = new IdGenerator();
-        
-        private StringDictionary properties = new StringDictionary();
-        private StringDictionary applicationProperties = null;
-
-        private TransportPropertyInterceptor transportProperties;
-        private ConnectionFactoryPropertyInterceptor connectionProperties;
-        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-        
-        private Amqp.ConnectionFactory impl;
-        private TransportContext transportContext;
-
-        #region Constructor Methods
-
         public ConnectionFactory()
-            : this(DEFAULT_BROKER_URL)
-        {
-        }
-
-        public ConnectionFactory(string brokerUri)
-            : this(URISupport.CreateCompatibleUri(brokerUri), null, null)
-        {
-
-        }
-
-        public ConnectionFactory(string brokerUri, string clientId)
-            : this(URISupport.CreateCompatibleUri(brokerUri), clientId, null)
-        {
-        }
-
-        public ConnectionFactory(Uri brokerUri)
-            : this(brokerUri, null, null)
-        { }
-
-        public ConnectionFactory(Uri brokerUri, StringDictionary props)
-            : this(brokerUri, null, props)
-        { }
-
-        public ConnectionFactory(Uri brokerUri, string clientId, StringDictionary props)
-        {
-            impl = new Amqp.ConnectionFactory();
-            this.clientId = clientId;
-            if (props != null)
-            {
-                this.InitApplicationProperties(props);
-            }
-            BrokerUri = brokerUri;
-            impl.AMQP.HostName = BrokerUri.Host;
-            //
-            // Set up tracing in AMQP.  We capture all AMQP traces in the TraceListener below
-            // and map to NMS 'Tracer' logs as follows:
-            //    AMQP          Tracer
-            //    Verbose       Debug
-            //    Frame         Debug
-            //    Information   Info
-            //    Output        Info    (should not happen)
-            //    Warning       Warn
-            //    Error         Error
-            //
-            Amqp.Trace.TraceLevel = Amqp.TraceLevel.Verbose | Amqp.TraceLevel.Frame;
-            Amqp.Trace.TraceListener = (level, format, args) =>
-            {
-                switch (level)
-                {
-                    case Amqp.TraceLevel.Verbose:
-                    case Amqp.TraceLevel.Frame:
-                        Tracer.DebugFormat(format, args);
-                        break;
-                    case Amqp.TraceLevel.Information:
-                    case Amqp.TraceLevel.Output:
-                        // 
-                        // Applications should not access AmqpLite directly so there
-                        // should be no 'Output' level logs.
-                        Tracer.InfoFormat(format, args);
-                        break;
-                    case Amqp.TraceLevel.Warning:
-                        Tracer.WarnFormat(format, args);
-                        break;
-                    case Amqp.TraceLevel.Error:
-                        Tracer.ErrorFormat(format, args);
-                        break;
-                    default:
-                        Tracer.InfoFormat("Unknown AMQP LogLevel: {}", level);
-                        Tracer.InfoFormat(format, args);
-                        break;
-                }
-            };
-
-        }
-
-        #endregion
-
-        #region Connection Factory Properties
-
-        internal bool IsClientIdSet
-        {
-            get => this.clientId == null;
-        }
-
-        public string ClientId
-        {
-            get { return this.clientId; }
-            internal set
-            {
-                this.clientId = value;
-            }
-        }
-
-
-        private IdGenerator ClientIDGenerator
-        {
-            get
-            {
-                IdGenerator cig = clientIdGenerator;
-                lock (this)
-                {
-                    if (cig == null)
-                    {
-                        clientIdGenerator = new IdGenerator();
-                        cig = clientIdGenerator;
-                    }
-                }
-                return cig;
-            }
-        }
-
-        internal Amqp.IConnectionFactory Factory { get => this.impl; }
-
-        internal IProviderTransportContext Context { get => this.transportContext; }
-
-        #endregion
-
-        #region IConnection Members
-
-        public Uri BrokerUri
         {
-            get { return brokerUri; }
-            set
-            {
-                brokerUri = value;
-                if (value != null)
-                {
-                    amqpHost = UriUtil.ToAddress(value);
-                }
-                else
-                {
-                    amqpHost = null;
-                }   
-                InitTransportProperties();
-                UpdateConnectionProperties();
-            }
         }
 
-        public ConsumerTransformerDelegate ConsumerTransformer
+        public ConnectionFactory(Uri brokerUri) : base(brokerUri)
         {
-            get => throw new NotImplementedException();
-            set => throw new NotImplementedException();
         }
 
-        public ProducerTransformerDelegate ProducerTransformer
+        public ConnectionFactory(string brokerUri) : base(brokerUri)
         {
-            get => throw new NotImplementedException();
-            set => throw new NotImplementedException();
-        }
-
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get
-            {
-                if (redeliveryPolicy == null)
-                {
-                    this.redeliveryPolicy = new RedeliveryPolicy();
-                }
-                return this.redeliveryPolicy;
-            }
-            set
-            {
-                if (value != null)
-                {
-                    this.redeliveryPolicy = value;
-                }
-            }
-        }
-
-        public Apache.NMS.IConnection CreateConnection()
-        {
-            try
-            {
-                Connection conn = new Connection(brokerUri, ClientIDGenerator);
-
-                Tracer.Info("Configuring Connection Properties");
-
-                bool shouldSetClientID = this.clientId != null;
-
-                conn.Configure(this);
-
-                if (shouldSetClientID)
-                {
-                    conn.ClientId = this.clientId;
-
-                    conn.Connect();
-                }
-
-                return conn;
-
-            }
-            catch (Exception ex)
-            {
-                if (ex is NMSException)
-                {
-                    throw ex;
-                }
-                else
-                {
-                    throw new NMSException(ex.Message, ex);
-                }
-            }
-        }
-
-        public Apache.NMS.IConnection CreateConnection(string userName, string password)
-        {
-
-            if(ConnectionProperties.ContainsKey(USERNAME_PROP))
-            {
-                ConnectionProperties[USERNAME_PROP] = userName;
-            }
-            else
-            {
-                ConnectionProperties.Add(USERNAME_PROP, userName);
-            }
-
-            if (ConnectionProperties.ContainsKey(PASSWORD_PROP))
-            {
-                ConnectionProperties[PASSWORD_PROP] = password;
-            }
-            else
-            {
-                ConnectionProperties.Add(PASSWORD_PROP, password);
-            }
-
-            return CreateConnection();
-        }
-
-
-
-        #endregion
-        #region AMQP Connection Properties
-        public Amqp.TraceLevel AMQPlogLevel
-        {
-            get { return this.AMQPlogLevel; }
-            set
-            {
-                if (null != this.transportContext)
-                {
-                    this.AMQPlogLevel = value;
-                    Amqp.Trace.TraceLevel = value;
-                }
-
-            }
-        }
-        #endregion
-
-        #region SSLConnection Methods
-
-        public RemoteCertificateValidationCallback CertificateValidationCallback
-        {
-            get
-            {
-                return (IsSSL) ? (transportContext as IProviderSecureTransportContext).ServerCertificateValidateCallback : null;
-            }
-            set
-            {
-                if (IsSSL)
-                {
-                    (transportContext as IProviderSecureTransportContext).ServerCertificateValidateCallback = value;
-                }
-            }
-        }
-
-        public LocalCertificateSelectionCallback LocalCertificateSelect
-        {
-            get
-            {
-                return (IsSSL) ? (transportContext as IProviderSecureTransportContext).ClientCertificateSelectCallback : null;
-            }
-            set
-            {
-                if (IsSSL)
-                {
-                    (transportContext as IProviderSecureTransportContext).ClientCertificateSelectCallback = value;
-                }
-            }
-        }
-
-        public bool IsSSL
-        {
-            get
-            {
-                return amqpHost?.UseSsl ?? false;
-            }
         }
         
-        private void InitTransportProperties()
+        public ConnectionFactory(string userName, string password, string brokerUri) : base(userName, password, brokerUri)
         {
-            if (IsSSL)
-            {
-                SecureTransportContext stc = new SecureTransportContext(this);
-                this.transportContext = stc;
-            }
-            else
-            {
-                this.transportContext = new TransportContext(this);
-            }
-            
-            StringDictionary queryProps = URISupport.ParseParameters(this.brokerUri);
-            StringDictionary transportProperties = URISupport.GetProperties(queryProps, TransportPropertyPrefix);
-            if (this.applicationProperties != null)
-            {
-                StringDictionary appTProps = URISupport.GetProperties(this.applicationProperties, TransportPropertyPrefix);
-                transportProperties = PropertyUtil.Merge(transportProperties, appTProps, string.Empty, string.Empty, TransportPropertyPrefix);
-            }
-            PropertyUtil.SetProperties(this.transportContext, transportProperties, TransportPropertyPrefix);
-            if (IsSSL)
-            {
-                this.transportProperties = new SecureTransportPropertyInterceptor(this.transportContext as IProviderSecureTransportContext, transportProperties);
-            }
-            else
-            {
-                this.transportProperties = new TransportPropertyInterceptor(this.transportContext, transportProperties);
-            }
         }
-
-        private void InitApplicationProperties(StringDictionary props)
-        {
-            // copy properties to temporary dictionary
-            StringDictionary result = PropertyUtil.Clone(props);
-            // extract connections properties
-            StringDictionary connProps = ExtractConnectionProperties(result);
-            // initialize applications properties as the union of temp and conn properties
-            this.applicationProperties = PropertyUtil.Merge(result, connProps, "", "", "");
-
-        }
-
-        private StringDictionary ExtractConnectionProperties(StringDictionary rawProps)
-        {
-            // find and extract properties with ConnectionPropertyPrefix
-            StringDictionary connectionProperties = URISupport.ExtractProperties(rawProps, ConnectionPropertyPrefix);
-            // find and extract properties with ConnectionPropertyAlternativePrefix
-            StringDictionary connectionAlternativeProperties = URISupport.ExtractProperties(rawProps, ConnectionPropertyAlternativePrefix);
-            // return Union of Conn and AltConn properties prefering Conn over AltConn.
-            return PropertyUtil.Merge(connectionProperties, connectionAlternativeProperties, ConnectionPropertyPrefix, ConnectionPropertyAlternativePrefix, ConnectionPropertyPrefix);
-        }
-
-        private StringDictionary CreateConnectionProperties(StringDictionary rawProps)
-        {
-            // read properties with ConnectionPropertyPrefix
-            StringDictionary connectionProperties = URISupport.GetProperties(rawProps, ConnectionPropertyPrefix);
-            // read properties with ConnectionPropertyAlternativePrefix
-            StringDictionary connectionAlternativeProperties = URISupport.GetProperties(rawProps, ConnectionPropertyAlternativePrefix);
-            // return Union of Conn and AltConn properties prefering Conn over AltConn.
-            return PropertyUtil.Merge(connectionProperties, connectionAlternativeProperties, ConnectionPropertyPrefix, ConnectionPropertyAlternativePrefix, ConnectionPropertyPrefix);
-        }
-
-        private void UpdateConnectionProperties()
-        {
-            StringDictionary queryProps = URISupport.ParseParameters(this.brokerUri);
-            StringDictionary brokerConnectionProperties = CreateConnectionProperties(queryProps);
-            if (this.applicationProperties != null)
-            {
-                // combine connection properties with application properties prefering URI properties over application
-                this.properties = PropertyUtil.Merge(brokerConnectionProperties, applicationProperties, "", "", "");
-            }
-            else
-            {
-                this.properties = brokerConnectionProperties;
-            }
-            // update connection factory members.
-            connectionProperties = new ConnectionFactoryPropertyInterceptor(this, this.properties);
-        }
-        #endregion
-
-        #region Connection Factory Property Methods
-        
-        public StringDictionary TransportProperties
-        {
-            get { return this.transportProperties; }
-        }
-
-        #endregion
-
-        #region Connection Properties Methods
-
-        public StringDictionary ConnectionProperties
-        {
-            get { return this.connectionProperties; }
-        }
-
-        public bool HasConnectionProperty(string key)
-        {
-            return this.properties.ContainsKey(key);
-        }
-
-        #endregion
     }
-
-    
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/ConnectionMetaData.cs b/src/NMS.AMQP/ConnectionMetaData.cs
index ecbf5f9..70fdd4f 100644
--- a/src/NMS.AMQP/ConnectionMetaData.cs
+++ b/src/NMS.AMQP/ConnectionMetaData.cs
@@ -70,7 +70,7 @@ namespace Apache.NMS.AMQP
         private readonly int NMSMinor;
         private ConnectionMetaData()
         {
-            Assembly assembly = Assembly.GetAssembly(typeof(ConnectionFactory));
+            Assembly assembly = Assembly.GetAssembly(typeof(NmsConnectionFactory));
             AssemblyVersion = assembly.GetName().Version.ToString();
 
             ProviderName = assembly.GetName().Name;
diff --git a/src/NMS.AMQP/Destination.cs b/src/NMS.AMQP/Destination.cs
deleted file mode 100644
index a998b4d..0000000
--- a/src/NMS.AMQP/Destination.cs
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP
-{
-    #region Destination Implementation
-    /// <summary>
-    /// Apache.NMS.AMQP.Destination implements Apache.NMS.IDestination
-    /// Destionation is an abstract container for a Queue or Topic.
-    /// </summary>
-    abstract class Destination : IDestination
-    {
-
-        protected string destinationName;
-        protected Connection connection;
-        private readonly bool queue;
-
-        #region Constructor
-
-        internal Destination(Connection conn, string name, bool isQ)
-        {
-            queue = isQ;
-            ValidateName(name);
-            destinationName = name;
-            connection = conn;
-            
-        }
-
-        internal Destination(Destination other)
-        {
-            this.queue = other.queue;
-            destinationName = other.destinationName;
-            connection = other.connection;
-        }
-
-        #endregion
-
-        #region Abstract Methods
-
-        protected abstract void ValidateName(string name);
-
-        #endregion
-
-        #region IDestination Properties
-
-        public virtual DestinationType DestinationType
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public virtual bool IsQueue
-        {
-            get
-            {
-                return queue;
-            }
-        }
-
-        public virtual bool IsTemporary
-        {
-            get
-            {
-                return false;
-            }
-        }
-
-        public virtual bool IsTopic
-        {
-            get
-            {
-                return !queue;
-            }
-        }
-
-        #endregion
-
-        #region IDisposable Methods
-
-        public void Dispose()
-        {
-            try
-            {
-                this.Dispose(true);
-            }
-            catch (Exception ex)
-            {
-                Tracer.DebugFormat("Caught Exception while disposing of {0} : {1}. Exception : {2}",
-                    this.DestinationType, this.destinationName, ex);
-            }
-        }
-
-        protected virtual void Dispose(bool disposing)
-        {
-
-        }
-
-        #endregion
-
-        #region Object Comparison Methods
-
-        public override string ToString()
-        {
-            return base.ToString() + ":" + destinationName;
-        }
-
-        public virtual bool Equals (Destination other)
-        {
-            return this.DestinationType == other.DestinationType && this.destinationName.Equals(other.destinationName);
-        }
-
-        public virtual bool Equals (IDestination destination)
-        {
-            if (this.DestinationType == destination.DestinationType)
-            {
-                if (destination is Destination)
-                {
-                    return this.Equals(destination as Destination);
-                }
-                else
-                {
-                    string destName = destination.IsTopic ? (destination as ITopic).TopicName : (destination as IQueue).QueueName;
-                    return (destName != null && destName.Length > 0) ? destName.CompareTo(this.destinationName) == 0 : false;
-                }
-            }
-            return false;
-        }
-
-        public override bool Equals(object obj)
-        {
-            if (obj != null && obj is IDestination)
-            {
-                return this.Equals(obj as IDestination);
-            }
-            return false;
-        }
-
-        public override int GetHashCode()
-        {
-            return destinationName.GetHashCode() * 31 + DestinationType.GetHashCode();
-        }
-
-        #endregion
-    }
-
-    #endregion
-
-    #region Temporary Destination Implementation
-
-    /// <summary>
-    /// Apache.NMS.AMQP.TemporaryDestination inherits NMS.AMQP.Destination
-    /// Destionation is an abstract container for a Temporary Queue or Temporary Topic.
-    /// </summary>
-    abstract class TemporaryDestination : Destination
-    {
-        
-        private readonly Id destinationId;
-
-        private bool deleted = false;
-
-        #region Constructor
-
-        public TemporaryDestination(Connection conn, Id name, bool isQ) : base(conn, name.ToString(), isQ)
-        {
-            destinationId = name;
-        }
-
-        public TemporaryDestination(Connection conn, string name, bool isQ) : base(conn, name, isQ)
-        {
-            destinationId = new Id(name);
-        }
-
-        #endregion
-
-        #region Temporary Destination Properties
-
-        internal Connection Connection
-        {
-            get { return connection; }
-        }
-
-        internal Id DestinationId
-        {
-            get
-            {
-                return destinationId;
-            }
-        }
-        
-        internal bool IsDeleted { get => deleted; }
-
-        internal string DestinationName
-        {
-            get => base.destinationName;
-            set => base.destinationName = value;
-        }
-
-        #endregion
-
-        #region Temporary Destination Methods
-
-        public virtual void Delete()
-        {
-            if (connection != null)
-            {
-                this.connection.DestroyTemporaryDestination(this);
-                connection = null;
-            }
-            deleted = true;
-        }
-
-        #endregion
-
-        #region IDestination Methods
-
-        public override bool IsTemporary
-        {
-            get
-            {
-                return true;
-            }
-        }
-
-        #endregion
-
-        #region IDisposable Methods
-
-        protected override void Dispose(bool disposing)
-        {
-            if (disposing)
-            {
-                this.Delete();
-            }
-        }
-        
-        #endregion
-
-        #region Object Comparison Methods
-
-        public override int GetHashCode()
-        {
-            return destinationId.GetHashCode();
-        }
-
-        public override bool Equals(Destination other)
-        {
-            if(other is TemporaryDestination)
-            {
-                return (other as TemporaryDestination).destinationId.Equals(this.destinationId) 
-                    || base.Equals(other);
-            }
-            return base.Equals(other);
-        }
-
-        #endregion
-
-    }
-
-    #endregion
-
-    #region Destination Transformation
-
-    internal class DestinationTransformation
-    {
-        public static Destination Transform(Connection connection, IDestination destination)
-        {
-            Destination transformDestination = null;
-            if (destination == null)
-                return null;
-
-            if (destination is Destination)
-            {
-                return destination as Destination;
-            }
-            string destinationName = null;
-
-            DestinationType type = destination.DestinationType;
-            switch (type)
-            {
-                case DestinationType.Queue:
-                case DestinationType.TemporaryQueue:
-                    destinationName = (destination as IQueue).QueueName;
-                    break;
-                case DestinationType.Topic:
-                case DestinationType.TemporaryTopic:
-                    destinationName = (destination as ITopic).TopicName;
-                    break;
-                default:
-                    throw new NMSException(string.Format("Unresolved destination. Unrecognized destination Type {0} for IDesintation {1}", type, destination?.ToString()));
-            }
-
-            if(destinationName == null)
-            {
-                throw new NMSException(string.Format("Unresolved destination. Could not resolved destination name for destination {0} type {1}.", destination?.ToString(), type));
-            }
-
-            switch (type)
-            {
-                case DestinationType.Queue:
-                    transformDestination = new Queue(connection, destinationName);
-                    break;
-                case DestinationType.TemporaryQueue:
-                    transformDestination = new TemporaryQueue(connection, destinationName);
-                    break;
-                case DestinationType.Topic:
-                    transformDestination = new Topic(connection, destinationName);
-                    break;
-                case DestinationType.TemporaryTopic:
-                    transformDestination = new TemporaryTopic(connection, destinationName);
-                    break;
-            }
-
-            return transformDestination;
-        }
-    }
-
-    #endregion
-}
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/INmsConnectionListener.cs
similarity index 65%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/INmsConnectionListener.cs
index 118b5ab..a29d90b 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/INmsConnectionListener.cs
@@ -14,23 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP
 {
-    internal interface IBytesMessageCloak : IMessageCloak
+    public interface INmsConnectionListener
     {
-        BinaryReader getDataReader();
-        BinaryWriter getDataWriter();
-
-        new IBytesMessageCloak Copy();
-
-        int BodyLength { get; }
-        void Reset();
+        void OnConsumerClosed(IMessageConsumer consumer, Exception exception);
+        void OnConnectionEstablished(Uri remoteUri);
+        void OnConnectionRestored(Uri remoteUri);
+        void OnConnectionFailure(NMSException exception);
+        void OnConnectionInterrupted(Uri remoteUri);
+        void OnProducerClosed(NmsMessageProducer messageProducer, Exception error);
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs
deleted file mode 100644
index 09d9ea9..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPBytesMessageCloak.cs
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    
-    using Cloak;
-    using Util;
-    using Factory;
-    class AMQPBytesMessageCloak : AMQPMessageCloak, IBytesMessageCloak
-    {
-
-        private static readonly Data EMPTY_DATA;
-
-        static AMQPBytesMessageCloak()
-        {
-            EMPTY_DATA = new Data();
-            EMPTY_DATA.Binary = new byte[0];
-        }
-
-        private EndianBinaryReader byteIn=null;
-        private EndianBinaryWriter byteOut=null;
-
-        internal AMQPBytesMessageCloak(Connection c) : base(c)
-        {
-            Content = null;
-        }
-
-        internal AMQPBytesMessageCloak(MessageConsumer c, Amqp.Message msg) : base (c, msg) { }
-
-        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_BYTE; } }
-
-        public override byte[] Content
-        {
-            get
-            {   
-                return this.GetBinaryFromBody().Binary;
-            }
-            set
-            {
-                Data result = EMPTY_DATA;
-                if (value != null && value.Length>0)
-                {
-                    result = new Data();
-                    result.Binary = value;
-                }
-                this.message.BodySection = result;
-                base.Content = result.Binary;
-            }
-        }
-
-        public int BodyLength
-        {
-            get
-            {
-                
-                return Content != null ? Content.Length : -1;
-            }
-        }
-
-        public BinaryReader getDataReader()
-        {
-            if(byteOut != null)
-            {
-                throw new IllegalStateException("Cannot read message while writing.");
-            }
-            if (byteIn == null)
-            {
-                byte[] data = Content;
-                if (Content == null)
-                {
-                    data = EMPTY_DATA.Binary;
-                }
-                Stream dataStream = new MemoryStream(data, false);
-                    
-                byteIn = new EndianBinaryReader(dataStream);
-            }
-            return byteIn;
-        }
-        public BinaryWriter getDataWriter()
-        {
-            if (byteIn != null)
-            {
-                throw new IllegalStateException("Cannot write message while reading.");
-            }
-            if (byteOut == null)
-            {
-                MemoryStream outputBuffer = new MemoryStream();
-                this.byteOut = new EndianBinaryWriter(outputBuffer);
-                message.BodySection = EMPTY_DATA;
-
-            }
-            return byteOut;
-        }
-
-        public void Reset()
-        {
-            if (byteOut != null)
-            {
-
-                MemoryStream byteStream = new MemoryStream((int)byteOut.BaseStream.Length);
-                byteOut.BaseStream.Position = 0;
-                byteOut.BaseStream.CopyTo(byteStream);
-                
-                byte[] value = byteStream.ToArray();
-                Content = value;
-                
-                byteStream.Close();
-                byteOut.Close();
-                byteOut = null;
-            }
-            if (byteIn != null)
-            {
-                byteIn.Close();
-                byteIn = null;
-            }
-        }
-
-        IBytesMessageCloak IBytesMessageCloak.Copy()
-        {
-            IBytesMessageCloak bcloak = new AMQPBytesMessageCloak(connection);
-            this.CopyInto(bcloak);
-            return bcloak;
-        }
-
-        public override void ClearBody()
-        {
-            this.Reset();
-            Content = null;
-        }
-
-        protected override void CopyInto(IMessageCloak msg)
-        {
-            base.CopyInto(msg);
-            this.Reset();
-            IBytesMessageCloak bmsg = msg as IBytesMessageCloak;
-            bmsg.Content = this.Content;
-            
-            
-        }
-
-        private Data GetBinaryFromBody()
-        {
-            RestrictedDescribed body = message.BodySection;
-            Data result = EMPTY_DATA;
-            if(body == null)
-            {
-                return result;
-            }
-            else if (body is Data)
-            {
-                byte[] binary = (body as Data).Binary;
-                if(binary != null && binary.Length != 0)
-                {
-                    return body as Data;
-                }
-            }
-            else if (body is AmqpValue)
-            {
-                object value = (body as AmqpValue).Value;
-                if(value == null) { return result; }
-                if(value is byte[])
-                {
-                    byte[] dataValue = value as byte[];
-                    if (dataValue.Length > 0)
-                    {
-                        result = new Data();
-                        result.Binary = dataValue;
-                    }
-                }
-                else
-                {
-                    throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
-                }
-            }
-            else
-            {
-                throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
-            }
-
-            return result;
-        }
-        public override string ToString()
-        {
-            string result = base.ToString();
-            if (this.Content != null)
-            {
-                result += string.Format("\nMessage Body: {0}\n", this.Content.ToString());
-            }
-            return result;
-        }
-    }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs
deleted file mode 100644
index 9754783..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMapMessageCloak.cs
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    using Cloak;
-    using Factory;
-    using Util;
-    using Util.Types;
-    using Util.Types.Map.AMQP;
-    class AMQPMapMessageCloak : AMQPMessageCloak, IMapMessageCloak
-    {
-        private IPrimitiveMap map = null;
-        private Map amqpmap = null;
-        
-
-        internal AMQPMapMessageCloak(Connection conn) : base(conn)
-        {
-            InitializeMapBody();
-        }
-
-        internal AMQPMapMessageCloak(MessageConsumer c, Amqp.Message msg) : base(c, msg)
-        {
-            InitializeMapBody();
-        }
-
-        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MAP; } }
-
-        private void InitializeMapBody()
-        {
-            if (message.BodySection == null)
-            {
-                amqpmap = new Map();
-                map = new AMQPValueMap(amqpmap);
-                AmqpValue val = new AmqpValue();
-                val.Value = amqpmap;
-                message.BodySection = val;
-            }
-            else 
-            {
-                if (message.BodySection is AmqpValue)
-                {
-                    object obj = (message.BodySection as AmqpValue).Value;
-                    if (obj == null)
-                    {
-                        amqpmap = new Map();
-                        map = new AMQPValueMap(amqpmap);
-                        (message.BodySection as AmqpValue).Value = amqpmap;
-                    }
-                    else if (obj is Map)
-                    {
-                        amqpmap = obj as Map;
-                        map = new AMQPValueMap(amqpmap);
-                    }
-                    else
-                    {
-                        throw new NMSException(string.Format("Invalid message body value type. Type: {0}.", obj.GetType().Name));
-                    }
-                }
-                else
-                {
-                    throw new NMSException("Invalid message body type.");
-                }
-            }
-            
-        }
-
-        IPrimitiveMap IMapMessageCloak.Map
-        {
-            get
-            {
-                return map;
-            }
-        }
-
-        IMapMessageCloak IMapMessageCloak.Copy()
-        {
-            IMapMessageCloak copy = new AMQPMapMessageCloak(Connection);
-            CopyInto(copy);
-            return copy;
-        }
-
-        protected override void CopyInto(IMessageCloak msg)
-        {
-            base.CopyInto(msg);
-            IPrimitiveMap copy = (msg as IMapMessageCloak).Map;
-            foreach (string key in this.map.Keys)
-            {
-                object value = map[key];
-                if (value != null)
-                {
-                    Type valType = value.GetType();
-                    if (valType.IsPrimitive)
-                    {
-                        // value copy primitive value
-                        copy[key] = value;
-                    }
-                    else if (valType.IsArray && valType.Equals(typeof(byte[])))
-                    {
-                        // use IPrimitive map SetBytes for most common implementation this is a deep copy.
-                        byte[] original = value as byte[];
-                        copy.SetBytes(key, original);
-                    }
-                    else if (valType.Equals(typeof(IDictionary)) || valType.Equals(typeof(Amqp.Types.Map)))
-                    {
-                        // reference copy
-                        copy.SetDictionary(key, value as IDictionary);
-                    }
-                    else if (valType.Equals(typeof(IList)) || valType.Equals(typeof(Amqp.Types.List)))
-                    {
-                        // reference copy
-                        copy.SetList(key, value as IList);
-                    }
-                    else
-                    {
-                        copy[key] = value;
-                    }
-                }
-                else
-                {
-                    copy[key] = value;
-                }
-                
-            }
-        }
-
-        public override string ToString()
-        {
-            string result = base.ToString();
-            if(this.map != null)
-            {
-                result +=string.Format("\nMessage Body: {0}\n", ConversionSupport.ToString(this.map));
-            }
-            return result;
-        }
-
-    }
-    
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs
deleted file mode 100644
index 95f1465..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageBuilder.cs
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    using Util;
-    using Cloak;
-    
-    class AMQPMessageBuilder
-    {
-        public static IMessage CreateProviderMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IMessage msg = null;
-            msg = CreateFromMessageAnnontations(consumer, message);
-            if(msg == null)
-            {
-                msg = CreateFromMessageBody(consumer, message);
-            }
-            if(msg == null)
-            {
-                throw new NMSException("Could not create NMS Message.");
-            }
-            return msg;
-        }
-
-        private static IMessage CreateFromMessageBody(MessageConsumer consumer, Amqp.Message message)
-        {
-            IMessage msg = null;
-            object body = message.Body;
-            if(body == null)
-            {
-                if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
-                {
-                    msg = CreateObjectMessage(consumer, message);
-                }
-                else if (IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
-                {
-                    msg = CreateBytesMessage(consumer, message);
-                }
-                else
-                {
-                    Symbol contentType = GetContentType(message);
-                    if(contentType != null)
-                    {
-                        msg = CreateTextMessage(consumer, message);
-                    }
-                    else
-                    {
-                        msg = CreateMessage(consumer, message);
-                    }
-                }
-            }
-            else if (message.BodySection is Data)
-            {
-                if(IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
-                {
-                    msg = CreateBytesMessage(consumer, message);
-                }
-                else if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
-                {
-                    msg = CreateObjectMessage(consumer, message);
-                }
-                else
-                {
-                    Symbol contentType = GetContentType(message);
-                    if(contentType != null)
-                    {
-                        msg = CreateTextMessage(consumer, message);
-                    }
-                    else
-                    {
-                        msg = CreateBytesMessage(consumer, message);
-                    }
-                }
-            }
-            else if (message.BodySection is AmqpSequence)
-            {
-                msg = CreateObjectMessage(consumer, message);
-            }
-            else if (body is string)
-            {
-                msg = CreateTextMessage(consumer, message);
-            }
-            else if (body is byte[])
-            {
-                msg = CreateBytesMessage(consumer, message);
-            }
-            else
-            {
-                msg = CreateObjectMessage(consumer, message);
-            }
-
-            return msg;
-        }
-
-        private static Symbol GetContentType(Amqp.Message message)
-        {
-            Properties msgProps = message.Properties;
-            if (msgProps == null)
-            {
-                return null;
-            }
-            else
-            {
-                return msgProps.ContentType;
-            }
-        }
-
-        private static bool IsContentType(Symbol type, Amqp.Message message)
-        {
-            Symbol contentType = GetContentType(message);
-            if (contentType == null)
-            {
-                return type == null;
-            }
-            else
-            {
-                return type.Equals(contentType);
-            }
-        }
-
-        private static IMessage CreateFromMessageAnnontations(MessageConsumer consumer, Amqp.Message message)
-        {
-            IMessage msg = null;
-            object objVal = message.MessageAnnotations[SymbolUtil.JMSX_OPT_MSG_TYPE];
-            if(objVal != null && objVal is SByte)
-            {
-                byte type = Convert.ToByte(objVal);
-                switch (type)
-                {
-                    case MessageSupport.JMS_TYPE_MSG:
-                        msg = CreateMessage(consumer, message);
-                        break;
-                    case MessageSupport.JMS_TYPE_BYTE:
-                        msg = CreateBytesMessage(consumer, message);
-                        break;
-                    case MessageSupport.JMS_TYPE_TXT:
-                        msg = CreateTextMessage(consumer, message);
-                        break;
-                    case MessageSupport.JMS_TYPE_OBJ:
-                        msg = CreateObjectMessage(consumer, message);
-                        break;
-                    case MessageSupport.JMS_TYPE_STRM:
-                        msg = CreateStreamMessage(consumer, message);
-                        break;
-                    case MessageSupport.JMS_TYPE_MAP:
-                        msg = CreateMapMessage(consumer, message);
-                        break;
-                    default:
-                        throw new NMSException("Unsupported Msg Annontation type: " + type);
-                }
-                
-            }
-            return msg;
-        }
-
-        private static IMessage CreateMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IMessageCloak cloak = new AMQPMessageCloak(consumer, message);
-            return new Message(cloak);
-        }
-
-        private static IMessage CreateTextMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            ITextMessageCloak cloak = new AMQPTextMessageCloak(consumer, message);
-            return new TextMessage(cloak);
-        }
-
-        private static IMessage CreateStreamMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IStreamMessageCloak cloak = new AMQPStreamMessageCloak(consumer, message);
-            return new StreamMessage(cloak);
-        }
-
-        private static IMessage CreateObjectMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IObjectMessageCloak cloak = new AMQPObjectMessageCloak(consumer, message);
-            return new ObjectMessage(cloak);
-        }
-
-        private static IMessage CreateMapMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IMapMessageCloak cloak = new AMQPMapMessageCloak(consumer, message);
-            return new MapMessage(cloak);
-        }
-
-        private static IMessage CreateBytesMessage(MessageConsumer consumer, Amqp.Message message)
-        {
-            IBytesMessageCloak cloak = new AMQPBytesMessageCloak(consumer, message);
-            return new BytesMessage(cloak);
-        }
-        
-    }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs
deleted file mode 100644
index dde222e..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageCloak.cs
+++ /dev/null
@@ -1,650 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
-using Amqp;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    using Util;
-    using Util.Types.Map.AMQP;
-    using Cloak;
-    using Factory;
-    using System.Reflection;
-
-    class AMQPMessageCloak : IMessageCloak
-    {
-        private TimeSpan timeToLive;
-        private IDestination replyTo;
-        private bool redelivered = false;
-        private string msgId;
-        private IDestination destination;
-        private string correlationId;
-        private IPrimitiveMap properties;
-        private MessagePropertyIntercepter propertyHelper;
-
-        private Header messageHeader = null;
-        private DeliveryAnnotations deliveryAnnontations = null;
-        private MessageAnnotations messageAnnontations = null;
-        private ApplicationProperties applicationProperties = null;
-        private Properties messageProperties = null;
-
-#pragma warning disable CS0414
-        private Footer messageFooter = null;
-#pragma warning restore CS0414
-
-        private byte[] content;
-        private bool readOnlyProperties = false;
-
-        protected Amqp.Message message;
-        protected readonly Connection connection;
-        protected MessageConsumer consumer;
-
-        internal AMQPMessageCloak(Connection c)
-        {
-            message = new Amqp.Message();
-            connection = c;
-            InitMessage();
-        }
-
-        internal AMQPMessageCloak(MessageConsumer c, Amqp.Message msg)
-        {
-            message = msg;
-            consumer = c;
-            connection = c.Session.Connection;
-            InitMessage();
-            InitDeliveryAnnotations();
-        }
-
-
-        #region Internal Properties
-
-        internal Connection Connection { get { return connection; } }
-
-        internal Amqp.Message AMQPMessage { get { return message; } }
-
-        internal virtual byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MSG; } }
-
-        #endregion
-
-        private void InitMessage()
-        {
-            InitMessageHeader();
-            InitMessageProperties();
-            SetMessageAnnotation(SymbolUtil.JMSX_OPT_MSG_TYPE, (sbyte)JMSMessageType);
-        }
-
-        protected virtual void CopyInto(IMessageCloak msg)
-        {
-            MessageTransformation.CopyNMSMessageProperties(this, msg);
-            msg.AckHandler = this.AckHandler;
-        }
-
-        #region Protected Amqp.Message Initialize/Accessor
-
-        protected void InitMessageHeader()
-        {
-            if (this.messageHeader == null && this.message.Header == null)
-            {
-                this.messageHeader = new Header();
-                this.message.Header = this.messageHeader;
-            }
-            else if (this.messageHeader == null && this.message.Header != null)
-            {
-                this.messageHeader = this.message.Header;
-            }
-            else if (this.messageHeader != null && this.message.Header == null)
-            {
-                this.message.Header = this.messageHeader;
-            }
-        }
-
-        protected void InitMessageProperties()
-        {
-            if (this.messageProperties == null && this.message.Properties == null)
-            {
-                this.messageProperties = new Properties();
-                this.message.Properties = this.messageProperties;
-            }
-            else if (this.messageProperties == null && this.message.Properties != null)
-            {
-                this.messageProperties = this.message.Properties;
-            }
-            else if (this.messageProperties != null && this.message.Properties == null)
-            {
-                this.message.Properties = this.messageProperties;
-            }
-        }
-
-        protected void InitApplicationProperties()
-        {
-            if (this.applicationProperties == null && this.message.ApplicationProperties == null)
-            {
-                this.applicationProperties = new ApplicationProperties();
-                this.message.ApplicationProperties = this.applicationProperties;
-            }
-            else if (this.applicationProperties == null && this.message.ApplicationProperties != null)
-            {
-                this.applicationProperties = this.message.ApplicationProperties;
-            }
-            else if (this.applicationProperties != null && this.message.ApplicationProperties == null)
-            {
-                this.message.ApplicationProperties = this.applicationProperties;
-
-            }
-        }
-
-        protected void InitDeliveryAnnotations()
-        {
-            if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations == null)
-            {
-                this.deliveryAnnontations = new DeliveryAnnotations();
-                this.message.DeliveryAnnotations = this.deliveryAnnontations;
-            }
-            else if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations != null)
-            {
-                this.deliveryAnnontations = this.message.DeliveryAnnotations;
-            }
-            else if (this.deliveryAnnontations != null && this.message.DeliveryAnnotations == null)
-            {
-                this.message.DeliveryAnnotations = this.deliveryAnnontations;
-            }
-        }
-
-        protected void InitMessageAnnontations()
-        {
-            if (this.messageAnnontations == null && this.message.MessageAnnotations == null)
-            {
-                this.messageAnnontations = new MessageAnnotations();
-                this.message.MessageAnnotations = messageAnnontations;
-            }
-            else if (this.messageAnnontations == null && this.message.MessageAnnotations != null)
-            {
-                this.messageAnnontations = this.message.MessageAnnotations;
-            }
-            else if (this.messageAnnontations != null && this.message.MessageAnnotations == null)
-            {
-                this.message.MessageAnnotations = this.messageAnnontations;
-            }
-        }
-
-        protected void SetDeliveryAnnotation(Symbol key, object value)
-        {
-            InitDeliveryAnnotations();
-            this.deliveryAnnontations[key] = value;
-        }
-
-        protected void SetMessageAnnotation(Symbol key, object value)
-        {
-            InitMessageAnnontations();
-            messageAnnontations[key] = value;
-        }
-
-        protected object GetMessageAnnotation(Symbol key)
-        {
-            InitMessageAnnontations();
-            return messageAnnontations[key];
-        }
-
-        #endregion
-
-        #region IMessageCloak Properties
-
-        public bool IsReceived { get { return consumer != null; } }
-
-        public virtual byte[] Content
-        {
-            get
-            {
-                return content;
-            }
-
-            set
-            {
-                content = value;
-            }
-        }
-
-        public virtual bool IsBodyReadOnly { get; set; }
-
-        public virtual bool IsPropertiesReadOnly
-        {
-            get
-            {
-                return (this.propertyHelper == null) ? readOnlyProperties : this.propertyHelper.ReadOnly;
-            }
-            set
-            {
-                if (this.propertyHelper != null)
-                    this.propertyHelper.ReadOnly = value;
-                readOnlyProperties = value;
-            }
-        }
-
-
-        public string NMSCorrelationID
-        {
-            get
-            {
-                if ( null != this.correlationId)
-                {
-                    return this.correlationId;
-                }
-                object objId = this.messageProperties.GetCorrelationId();
-                if (objId != null)
-                {
-                    // correlationId strings are returned as-is to the application, otherwise
-                    // convert it to a NMSMessageId string 
-                    if (objId is string)
-                    {
-                        this.correlationId = objId as string;
-                    }
-                    else
-                    {
-                        this.correlationId = MessageSupport.CreateNMSMessageId(objId);
-                    }
-                }
-
-                return this.correlationId;
-            }
-            set
-            {
-                object objId = MessageSupport.CreateAMQPMessageId(value);
-                this.messageProperties.SetCorrelationId(objId);
-                this.correlationId = value;
-            }
-        }
-
-        public MsgDeliveryMode NMSDeliveryMode
-        {
-            get
-            {
-                if (this.messageHeader.Durable)
-                {
-                    return MsgDeliveryMode.Persistent;
-                }
-                else
-                {
-                    return MsgDeliveryMode.NonPersistent;
-                }
-
-            }
-            set
-            {
-                if (value.Equals(MsgDeliveryMode.Persistent))
-                {
-                    this.messageHeader.Durable = true;
-                }
-                else
-                {
-                    this.messageHeader.Durable = false;
-                }
-            }
-        }
-
-        public IDestination NMSDestination
-        {
-            get
-            {
-                if (destination == null && consumer != null)
-                {
-                    object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST);
-                    if (typeObj != null)
-                    {
-                        byte type = Convert.ToByte(typeObj);
-                        destination = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type);
-                        if(destination == null)
-                        {
-                            destination = consumer.Destination;
-                        }
-                    }
-                }
-                return destination;
-            }
-            set
-            {
-                string destString = null;
-                IDestination dest = null;
-                if (value != null) {
-                    destString = UriUtil.GetAddress(value, Connection);
-                    dest = value;
-                }
-                this.messageProperties.To = destString;
-                SetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST, MessageSupport.GetValueForDestination(dest));
-                destination = dest;
-            }
-        }
-
-        public string NMSMessageId
-        {
-            get
-            {
-                object objId = this.messageProperties.GetMessageId();
-                if (this.msgId == null && objId != null)
-                {
-                    this.msgId = MessageSupport.CreateNMSMessageId(objId);
-                }
-                return this.msgId;
-            }
-            set
-            {
-                object msgId = MessageSupport.CreateAMQPMessageId(value);
-                //Tracer.InfoFormat("Set message Id to <{0}>: {1}", msgId.GetType().Name, msgId.ToString());
-                this.messageProperties.SetMessageId(msgId);
-                this.msgId = value;
-            }
-        }
-
-        public MsgPriority NMSPriority
-        {
-            get { return MessageSupport.GetPriorityFromValue(this.messageHeader.Priority); }
-            set
-            {
-                this.messageHeader.Priority = MessageSupport.GetValueForPriority(value);
-            }
-        }
-
-        public bool NMSRedelivered
-        {
-            get
-            {
-                if (this.messageHeader.DeliveryCount > 0)
-                {
-                    redelivered = true;
-                }
-                return redelivered;
-            }
-            set { redelivered = value; }
-        }
-
-        public IDestination NMSReplyTo
-        {
-            get
-            {
-                if (replyTo == null && IsReceived)
-                {
-                    object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO);
-                    if (typeObj != null)
-                    {
-                        byte type = Convert.ToByte(typeObj);
-                        replyTo = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type, true);
-                    }
-                }
-                return replyTo;
-            }
-            set
-            {
-                IDestination dest = null;
-                string destString = null;
-                if (value != null)
-                {
-                    destString = UriUtil.GetAddress(value, Connection);
-                    dest = value;
-                    SetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO, MessageSupport.GetValueForDestination(dest));
-                }
-                this.messageProperties.ReplyTo = destString;
-
-                replyTo = dest;
-            }
-        }
-
-        public DateTime NMSTimestamp
-        {
-            get { return messageProperties.CreationTime; }
-            set
-            {
-                messageProperties.CreationTime = value;
-                if (NMSTimeToLive != null && NMSTimeToLive != TimeSpan.Zero)
-                {
-                    messageProperties.AbsoluteExpiryTime = value + timeToLive;
-                }
-            }
-        }
-
-        public TimeSpan NMSTimeToLive
-        {
-            get
-            {
-                if ( timeToLive != null)
-                {
-                    return timeToLive;
-                }
-                if (messageProperties.AbsoluteExpiryTime == DateTime.MinValue)
-                {
-
-                    timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(this.messageHeader.Ttl));
-                    return timeToLive;
-                }
-                else
-                {
-                    return messageProperties.AbsoluteExpiryTime - NMSTimestamp;
-                }
-            }
-            set
-            {
-                timeToLive = value;
-            }
-        }
-
-        public string NMSType
-        {
-            get { return this.messageProperties.Subject; }
-            set { this.messageProperties.Subject = value; }
-        }
-
-        public IPrimitiveMap Properties
-        {
-            get
-            {
-                if (properties == null)
-                {
-                    InitApplicationProperties();
-                    properties = new AMQPPrimitiveMap(this.applicationProperties);
-                    propertyHelper = new MessagePropertyIntercepter(this, properties, readOnlyProperties);
-                }
-                return propertyHelper;
-            }
-        }
-
-        public int DeliveryCount
-        {
-            get
-            {
-                return Convert.ToInt32(this.messageHeader.DeliveryCount);
-            }
-
-            set
-            {
-                this.messageHeader.DeliveryCount = Convert.ToUInt32(value);
-            }
-        }
-
-        public int RedeliveryCount
-        {
-            get
-            {
-                return DeliveryCount - 1;
-            }
-
-            set
-            {
-                DeliveryCount = value + 1;
-            }
-        }
-
-        public MessageAcknowledgementHandler AckHandler { get; set; }
-        
-        public void Acknowledge()
-        {
-            if (AckHandler != null)
-            {
-                if (connection.IsClosed)
-                {
-                    throw new IllegalStateException("Can not acknowledge Message on closed connection.");
-                }
-            
-                AckHandler.Acknowledge();
-                AckHandler = null;
-            }
-        }
-
-        public virtual void ClearBody()
-        {
-            Content = null;
-        }
-
-        public virtual void ClearProperties()
-        {
-            if (properties != null)
-            {
-                propertyHelper.Clear();
-            }
-        }
-        
-        public virtual IMessageCloak Copy()
-        {
-            IMessageCloak copy = null;
-            switch(JMSMessageType)
-            {
-                case MessageSupport.JMS_TYPE_MSG:
-                    copy = new AMQPMessageCloak(connection);
-                    break;
-                case MessageSupport.JMS_TYPE_BYTE:
-                    copy = new AMQPBytesMessageCloak(connection);
-                    break;
-                case MessageSupport.JMS_TYPE_TXT:
-                    copy = new AMQPTextMessageCloak(connection);
-                    break;
-                case MessageSupport.JMS_TYPE_MAP:
-                    copy = new AMQPMapMessageCloak(connection);
-                    break;
-                case MessageSupport.JMS_TYPE_STRM:
-                    copy = new AMQPStreamMessageCloak(connection);
-                    break;
-                case MessageSupport.JMS_TYPE_OBJ:
-                    copy = new AMQPObjectMessageCloak(connection, (this as AMQPObjectMessageCloak).Type);
-                    break;
-                default:
-                    throw new NMSException("Fatal error Invalid JMS type.");
-            }
-            
-            CopyInto(copy);
-            return copy;
-        }
-
-        public object GetMessageAnnotation(string symbolKey)
-        {
-            Symbol sym = symbolKey;
-            return GetMessageAnnotation(sym);
-        }
-
-        public void SetMessageAnnotation(string symbolKey, object value)
-        {
-            Symbol sym = symbolKey;
-            SetMessageAnnotation(sym, value);
-        }
-
-        public object GetDeliveryAnnotation(string symbolKey)
-        {
-            Symbol sym = symbolKey;
-            return GetDeliveryAnnotation(sym);
-        }
-
-        public void SetDeliveryAnnotation(string symbolKey, object value)
-        {
-            Symbol sym = symbolKey;
-            SetDeliveryAnnotation(sym, value);
-        }
-
-        public string GetContentType()
-        {
-            return GetContentTypeSymbol();
-        }
-
-        public void SetContentType(string type)
-        {
-            SetContentType(new Symbol(type));
-        }
-
-        protected virtual Symbol GetContentTypeSymbol()
-        {
-            return this.messageProperties.ContentType;
-        }
-
-        protected virtual void SetContentType(Symbol type)
-        {
-            this.messageProperties.ContentType = type;
-        }
-        
-        #endregion
-
-        public override string ToString()
-        {
-            string result = string.Format("{0}:\n", this.GetType());
-            result += string.Format("inner amqp message: \n{0}\n", AMQPMessageCloak.ToString(message));
-            result += "NMS Fields = [\n";
-            foreach (MemberInfo info in this.GetType().GetMembers())
-            {
-                if (info is PropertyInfo)
-                {
-                    PropertyInfo prop = info as PropertyInfo;
-                    if (prop.GetGetMethod(true).IsPublic)
-                    {
-                        try
-                        {
-                            Object val = prop.GetValue(this, null);
-                            if (val is IPrimitiveMap )
-                            {
-                                result += prop.Name + " = " + ConversionSupport.ToString(val as IPrimitiveMap) +",\n";
-                            }
-                            else
-                            {
-                                result += string.Format("{0} = {1},\n", prop.Name, val);
-                            }
-                        }catch(TargetInvocationException tie)
-                        {
-                            Tracer.InfoFormat("Failed to invoke Member field accessor: {0}, cause: {1}", prop.Name, tie);
-                        }
-                    }
-                }
-            }
-            result = result.Substring(0, result.Length - 2) + "\n]";
-            return result;
-        }
-
-        public static string ToString(Amqp.Message message)
-        {
-            if (message == null) return "null";
-            string result = "Type="+ message.GetType().Name +":\n";
-            
-            if (message.Header != null)
-            {
-                result += "Message Header: " + message.Header.ToString() + "\n";
-            }
-            
-            return result;
-        }
-    }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs b/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs
deleted file mode 100644
index 454b8e8..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPMessageTransformation.cs
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Message.Factory;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    class AMQPMessageTransformation <T> : MessageTransformation where T:ConnectionInfo
-    {
-        protected readonly Connection connection;
-        protected readonly MessageFactory<T> factory;
-
-        public AMQPMessageTransformation(AMQPMessageFactory<T> fact) : base()
-        {
-            connection = fact.Parent;    
-            factory = fact;
-        }
-
-        protected override IBytesMessage DoCreateBytesMessage()
-        {
-            return factory.CreateBytesMessage();
-        }
-
-        protected override IMapMessage DoCreateMapMessage()
-        {
-            return factory.CreateMapMessage();
-        }
-
-        protected override IMessage DoCreateMessage()
-        {
-            return factory.CreateMessage();
-        }
-
-        protected override IObjectMessage DoCreateObjectMessage()
-        {
-            return factory.CreateObjectMessage(null);
-        }
-
-        protected override IStreamMessage DoCreateStreamMessage()
-        {
-            return factory.CreateStreamMessage();
-        }
-
-        protected override ITextMessage DoCreateTextMessage()
-        {
-            return factory.CreateTextMessage();
-        }
-
-        protected override void DoPostProcessMessage(IMessage message)
-        {
-            // nothing for now
-        }
-
-        protected override IDestination DoTransformDestination(IDestination destination)
-        {
-            return DestinationTransformation.Transform(connection, destination);
-        }
-    }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs
deleted file mode 100644
index 49b6a03..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPObjectMessageCloak.cs
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Runtime.Serialization;
-using System.Runtime.Serialization.Formatters.Binary;
-using System.IO;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    using Amqp;
-    using Cloak;
-    using Util;
-    using Util.Types;
-    
-    class AMQPObjectMessageCloak : AMQPMessageCloak, IObjectMessageCloak
-    {
-
-        public static AMQPObjectEncodingType DEFAULT_ENCODING_TYPE = AMQPObjectEncodingType.AMQP_TYPE;
-
-        private IAMQPObjectSerializer objectSerializer;
-
-        #region Contructors
-        internal AMQPObjectMessageCloak(Apache.NMS.AMQP.Connection c, AMQPObjectEncodingType type) : base(c)
-        {
-            InitializeObjectSerializer(type);
-            Body = null;
-        }
-
-        internal AMQPObjectMessageCloak(MessageConsumer mc, Amqp.Message message) : base(mc, message)
-        {
-            if (message.Properties.ContentType.Equals(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE))
-            {
-                if(message.MessageAnnotations.Map.ContainsKey(MessageSupport.JMS_JAVA_ENCODING) 
-                    && message.MessageAnnotations.Map[MessageSupport.JMS_JAVA_ENCODING].Equals(SymbolUtil.BOOLEAN_TRUE))
-                {
-                    InitializeObjectSerializer(AMQPObjectEncodingType.JAVA_SERIALIZABLE);
-                }
-                else
-                {
-                    InitializeObjectSerializer(AMQPObjectEncodingType.DOTNET_SERIALIZABLE);
-                }
-            }
-            else
-            {
-                InitializeObjectSerializer(AMQPObjectEncodingType.AMQP_TYPE);
-            }
-        }
-
-        #endregion
-
-        #region Internal Properties Fields
-        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_OBJ; } }
-        #endregion
-
-        #region Public IObjectMessageCloak Properties
-        public AMQPObjectEncodingType Type { get { return this.objectSerializer.Type; } }
-
-        public object Body
-        {
-            get
-            {
-                return this.objectSerializer.GetObject();
-            }
-            set
-            {
-                this.objectSerializer.SetObject(value);
-            }
-        }
-
-        public override byte[] Content
-        {
-            get
-            {
-                return null;
-            }
-            set
-            {
-                
-            }
-        }
-
-        #endregion
-
-        #region IMessageCloak Copy Methods
-
-        IObjectMessageCloak IObjectMessageCloak.Copy()
-        {
-            IObjectMessageCloak ocloak = new AMQPObjectMessageCloak(connection, this.objectSerializer.Type);
-            this.CopyInto(ocloak);
-            return ocloak;
-        }
-        
-
-        protected override void CopyInto(IMessageCloak msg)
-        {
-            base.CopyInto(msg);
-            if (msg is IObjectMessageCloak)
-            {
-                IObjectMessageCloak copy = msg as IObjectMessageCloak;
-                if (copy is AMQPObjectMessageCloak)
-                {
-                    this.objectSerializer.CopyInto((copy as AMQPObjectMessageCloak).objectSerializer);
-                }
-                else
-                {
-                    this.objectSerializer.SetObject(copy.Body);
-                }
-            }
-            
-        }
-
-        #endregion
-
-        #region Private Methods
-
-        private void InitializeObjectSerializer(AMQPObjectEncodingType type)
-        {
-            switch (type)
-            {
-                case AMQPObjectEncodingType.AMQP_TYPE:
-                    objectSerializer = new AMQPTypeSerializer(this);
-                    break;
-                case AMQPObjectEncodingType.DOTNET_SERIALIZABLE:
-                    objectSerializer = new DotnetObjectSerializer(this);
-                    break;
-                case AMQPObjectEncodingType.JAVA_SERIALIZABLE:
-                    objectSerializer = new JavaObjectSerializer(this);
-                    break;
-                default:
-                    throw NMSExceptionSupport.Create(new ArgumentException("Unsupported object encoding."));
-            }
-        }
-
-        #endregion
-    }
-
-    #region IAMQPObjectSerializer
-
-    #region IAMQPObjectSerializer Interface
-    internal interface IAMQPObjectSerializer
-    {
-        Amqp.Message Message { get; }
-        void SetObject(object o);
-        object GetObject();
-
-        void CopyInto(IAMQPObjectSerializer serializer);
-
-        AMQPObjectEncodingType Type { get; }
-
-    }
-
-    #endregion
-
-    #region AMQP Type IAMQPObjectSerializer Implementation
-    class AMQPTypeSerializer : IAMQPObjectSerializer
-    {
-        
-        private readonly Amqp.Message amqpMessage;
-        private readonly AMQPObjectMessageCloak message;
-        internal AMQPTypeSerializer(AMQPObjectMessageCloak msg)
-        {
-            amqpMessage = msg.AMQPMessage;
-            message = msg;
-            msg.SetMessageAnnotation(MessageSupport.JMS_AMQP_TYPE_ENCODING, SymbolUtil.BOOLEAN_TRUE);
-        }
-
-        public Message Message { get { return amqpMessage; } }
-
-        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.AMQP_TYPE; } }
-
-        public void CopyInto(IAMQPObjectSerializer serializer)
-        {
-            serializer.SetObject(GetObject());
-        }
-
-        public object GetObject()
-        {
-            RestrictedDescribed body = amqpMessage.BodySection;
-            if(body == null)
-            {
-                return null;
-            }
-            else if (body is AmqpValue)
-            {
-                AmqpValue value = body as AmqpValue;
-                return value.Value;
-            }
-            else if (body is Data)
-            {
-                return (body as Data).Binary;
-            }
-            else if (body is AmqpSequence)
-            {
-                return (body as AmqpSequence).List;
-            }
-            else
-            {
-                throw new IllegalStateException("Unexpected body type: " + body.GetType().Name);
-            }
-        }
-
-        public void SetObject(object o)
-        {
-            if(o == null)
-            {
-                amqpMessage.BodySection = MessageSupport.NULL_AMQP_VALUE_BODY;
-            }
-            else if (IsNMSObjectTypeSupported(o))
-            {
-                object value = null;
-                if(o is IList)
-                {
-                    value = ConversionSupport.ListToAmqp(o as IList);
-                }
-                else if (o is IPrimitiveMap)
-                {
-                    value = ConversionSupport.NMSMapToAmqp(o as IPrimitiveMap);
-                }
-                else
-                {
-                    value = o;
-                }
-                // to copy the object being set encode a message then decode and take body
-                Amqp.Message copy = new Amqp.Message(value);
-                ByteBuffer buffer = copy.Encode();
-                copy = Message.Decode(buffer);
-                
-                amqpMessage.BodySection = new AmqpValue { Value = copy.Body };
-            }
-            else
-            {
-                throw new ArgumentException("Encoding unexpected object type: " + o.GetType().Name);
-            }
-        }
-
-        private bool IsNMSObjectTypeSupported(object o)
-        {
-            return ConversionSupport.IsNMSType(o) || o is List || o is Map || o is IPrimitiveMap || o is IList;
-        }
-    }
-
-    #endregion
-
-    #region Dotnet Serializable IAMQPObjectSerializer Implementation
-
-    class DotnetObjectSerializer : IAMQPObjectSerializer
-    {
-        private readonly Amqp.Message amqpMessage;
-        private readonly AMQPObjectMessageCloak message;
-        internal DotnetObjectSerializer(AMQPObjectMessageCloak msg)
-        {
-            amqpMessage = msg.AMQPMessage;
-            message = msg;
-            msg.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-            msg.SetMessageAnnotation(MessageSupport.JMS_DONET_ENCODING, SymbolUtil.BOOLEAN_TRUE);
-        }
-
-        public Message Message { get { return amqpMessage; } }
-
-        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.DOTNET_SERIALIZABLE; } }
-
-        public void CopyInto(IAMQPObjectSerializer serializer)
-        {
-            serializer.SetObject(GetObject());
-        }
-
-        public object GetObject()
-        {
-            byte[] bin = null;
-            if(Message.BodySection == null)
-            {
-                return null;
-            }
-            else if (Message.BodySection is Data)
-            {
-                Data data = Message.BodySection as Data;
-                bin = data.Binary;
-            }
-            // TODO handle other body types.
-
-            if (bin == null || bin.Length == 0)
-            {
-                return null;
-            }
-            else
-            {
-                return GetDeserializedObject(bin);
-            }
-            
-        }
-
-        public void SetObject(object o)
-        {
-            
-            byte[] bin = GetSerializedObject(o);
-            if(bin == null || bin.Length == 0)
-            {
-                amqpMessage.BodySection = MessageSupport.EMPTY_DATA;
-            }
-            else
-            {
-                amqpMessage.BodySection = new Data() { Binary = bin };
-            }
-            
-        }
-
-        private object GetDeserializedObject(byte[] binary)
-        {
-            object result = null;
-
-            MemoryStream stream = null;
-            IFormatter formatter = null;
-            try
-            {
-                stream = new MemoryStream(binary);
-                formatter = new BinaryFormatter();
-                result = formatter.Deserialize(stream);
-            }
-            finally
-            {
-                stream?.Close();
-            }
-
-
-            return result;
-
-        }
-
-        private byte[] GetSerializedObject(object o)
-        {
-            if (o == null) return new byte[] { 0xac,0xed,0x00,0x05,0x70 };
-            MemoryStream stream = null;
-            IFormatter formatter = null;
-            byte[] result = null;
-            try
-            {
-                stream = new MemoryStream();
-                formatter = new BinaryFormatter();
-                formatter.Serialize(stream, o);
-                result = stream.ToArray();
-            }
-            finally
-            {
-                if(stream!= null)
-                {
-                    stream.Close();
-                }
-            }
-            
-            return result;
-        }
-    }
-
-    #endregion
-
-    #region Java Serializable IAMQPObjectSerializer Implementation
-
-    class JavaObjectSerializer : IAMQPObjectSerializer
-    {
-        private readonly Amqp.Message amqpMessage;
-        private readonly AMQPObjectMessageCloak message;
-        internal JavaObjectSerializer(AMQPObjectMessageCloak msg)
-        {
-            amqpMessage = msg.AMQPMessage;
-            message = msg;
-            message.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
-            message.SetMessageAnnotation(MessageSupport.JMS_JAVA_ENCODING, SymbolUtil.BOOLEAN_TRUE);
-        }
-
-        public Message Message { get { return amqpMessage; } }
-
-        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.JAVA_SERIALIZABLE; } }
-
-        public void CopyInto(IAMQPObjectSerializer serializer)
-        {
-            // TODO fix to copy java serialized object as binary.
-            serializer.SetObject(GetObject());
-        }
-
-        public object GetObject()
-        {
-            throw new NotImplementedException("Java Serialized Object body Not Supported.");
-        }
-
-        public void SetObject(object o)
-        {
-            throw new NotImplementedException("Java Serialized Object body Not Supported.");
-        }
-    }
-
-    #endregion // Java IAMQPObjectSerializer Impl
-
-    #endregion // IAMQPObjectSerializer
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs
deleted file mode 100644
index 27dad32..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPStreamMessageCloak.cs
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Framing;
-using Amqp.Types;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-
-    using Cloak;
-    using Util;
-    using Factory;
-    class AMQPStreamMessageCloak : AMQPMessageCloak, IStreamMessageCloak
-    {
-        private IList list;
-        private int position = 0;
-        internal AMQPStreamMessageCloak(Connection c):base(c)
-        {
-            list = InitializeEmptyBody(true);
-        }
-
-        internal AMQPStreamMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg)
-        {
-            if (msg.BodySection == null)
-            {
-                list = InitializeEmptyBody(true);
-            }
-            else if (msg.BodySection is AmqpSequence)
-            {
-                IList value = (msg.BodySection as AmqpSequence).List;
-                if(value == null)
-                {
-                    list = InitializeEmptyBody(true);
-                }
-                else
-                {
-                    list = value;
-                }
-            }
-            else if (msg.BodySection is AmqpValue)
-            {
-                object value = (msg.BodySection as AmqpValue).Value;
-                if(value == null)
-                {
-                    list = InitializeEmptyBody(false);
-                }
-                else if (value is IList)
-                {
-                    list = value as IList;
-                }
-                else
-                {
-                    throw new IllegalStateException("Unexpected amqp-value body content type: " + value.GetType().Name);
-                }
-            }
-            else
-            {
-                throw new IllegalStateException("Unexpected message body type: " + msg.BodySection.GetType().Name);
-            }
-        }
-
-        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_STRM; } }
-
-        #region Private Methods
-
-        private List InitializeEmptyBody(bool isSequence)
-        {
-            List l = new List();
-            if (isSequence)
-            {
-                AmqpSequence seq = new Amqp.Framing.AmqpSequence();
-                message.BodySection = seq;
-                seq.List = l;
-                
-            }
-            else
-            {
-                Amqp.Framing.AmqpValue val = new Amqp.Framing.AmqpValue();
-                val.Value = l;
-                message.BodySection = val;
-            }
-            return l;
-        }
-
-        private bool IsEmpty { get { return list.Count <= 0; } }
-
-        #endregion
-
-        #region IStreamMessageCloak Methods
-        public bool HasNext { get { return !IsEmpty && position < list.Count; } }
-
-        public object Peek()
-        {
-            if(IsEmpty || position >= list.Count)
-            {
-                throw new EndOfStreamException("Attempt to read past the end of stream");
-            }
-            object value = list[position];
-            if(value != null && value is byte[])
-            {
-                byte[] binary = value as byte[];
-                byte[] bin = new byte[binary.Length];
-                binary.CopyTo(bin, 0);
-                value = bin;
-            }
-            return value;
-        }
-
-        public void Pop()
-        {
-            if (IsEmpty || position > list.Count)
-            {
-                throw new EndOfStreamException("Attempt to read past the end of stream");
-            }
-            position++;
-        }
-
-        public void Put(object value)
-        {
-            object entry = value;
-            if (entry != null && entry is byte[])
-            {
-                byte[] bin = new byte[(entry as byte[]).Length];
-                (entry as byte[]).CopyTo(bin, 0);
-                entry = bin;
-            }
-            if (list.Add(entry) < 0)
-            {
-                throw NMSExceptionSupport.Create(string.Format("Failed to add {0} to stream.", entry.ToString()), null);
-            }
-            position++;
-        }
-
-        public void Reset()
-        {
-            position = 0;
-        }
-
-        public override void ClearBody()
-        {
-            base.ClearBody();
-            list.Clear();
-            position = 0;
-        }
-
-        IStreamMessageCloak IStreamMessageCloak.Copy()
-        {
-            return base.Copy() as IStreamMessageCloak;
-        }
-
-        protected override void CopyInto(IMessageCloak msg)
-        {
-            base.CopyInto(msg);
-            if(msg is IStreamMessageCloak)
-            {
-                IStreamMessageCloak copy = (msg as IStreamMessageCloak);
-                
-                foreach(object o in list)
-                {
-                    copy.Put(o);
-                }
-            }
-        }
-        public override string ToString()
-        {
-            string result = base.ToString();
-            result += "\nMessage Body: {";
-            foreach(object o in list)
-            {
-                //
-                // handle byte arrays for now
-                // add more special handlers as needed.
-                //
-                if (o is byte[])
-                {
-
-                    result += string.Format("\n{0} len={1}: {2}", o.GetType(), (o as byte[]).Length, BitConverter.ToString(o as byte[]).Replace("-", " "));
-                }
-                else
-                {
-                    result += string.Format("\n{0}: {1}", o.GetType(), o.ToString());
-                }
-            }
-            result += "\n}";
-            return result;
-        }
-        #endregion
-    }
-}
diff --git a/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs b/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs
deleted file mode 100644
index dd093bc..0000000
--- a/src/NMS.AMQP/Message/AMQP/AMQPTextMessageCloak.cs
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp.Types;
-using Amqp.Framing;
-
-namespace Apache.NMS.AMQP.Message.AMQP
-{
-    using Cloak;
-    using Util;
-    using Factory;
-
-    class AMQPTextMessageCloak : AMQPMessageCloak, ITextMessageCloak
-    {
-        #region Constructor
-
-        internal AMQPTextMessageCloak(Connection c) : base(c) {}
-
-        internal AMQPTextMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg) {}
-
-        #endregion
-
-        internal override byte JMSMessageType
-        {
-            get
-            {
-                return MessageSupport.JMS_TYPE_TXT;
-            }
-        }
-
-        public string Text
-        {
-            get
-            {
-                return GetTextFromBody();
-            }
-
-            set
-            {
-                AmqpValue val = new AmqpValue();
-                val.Value = value;
-                this.message.BodySection = val;
-            }
-        }
-        
-        ITextMessageCloak ITextMessageCloak.Copy()
-        {
-            ITextMessageCloak tcloak = new AMQPTextMessageCloak(connection);
-            CopyInto(tcloak);
-            return tcloak;
-        }
-        
-        protected override void CopyInto(IMessageCloak msg)
-        {
-            base.CopyInto(msg);
-            (msg as ITextMessageCloak).Text = Text;
-        }
-
-        private static string DecodeBinaryBody(byte[] body)
-        {
-            string result = string.Empty;
-            if(body != null && body.Length > 0)
-            {
-                result = Encoding.UTF8.GetString(body);
-            }
-            return result;
-        }
-
-        private string GetTextFromBody()
-        {
-            string result = string.Empty;
-            RestrictedDescribed body = this.message.BodySection;
-            if(body == null)
-            {
-                return result;
-            }
-            else if (body is Data)
-            {
-                byte[] data = (body as Data).Binary;
-                result = DecodeBinaryBody(data);
-            }
-            else if(body is AmqpValue)
-            {
-                object value = (body as AmqpValue).Value;
-                if(value == null)
-                {
-                    return result;
-                }
-                else if (value is byte[])
-                {
-                    result = DecodeBinaryBody(value as byte[]);
-                }
-                else if (value is string)
-                {
-                    result = value as string;
-                }
-                else
-                {
-                    throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
-                }
-            }
-            else
-            {
-                throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
-            }
-
-
-            return result;
-        }
-        public override string ToString()
-        {
-            string result = base.ToString();
-            if (this.Text != null)
-            {
-                result += string.Format("\nMessage Body: {0}\n", Text);
-            }
-            return result;
-        }
-
-    }
-}
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/AckType.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/AckType.cs
index b85f947..248f0b3 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/AckType.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message
 {
-    interface ITextMessageCloak : IMessageCloak
+    public enum AckType
     {
-        string Text { get; set; }
-        new ITextMessageCloak Copy();
+        ACCEPTED = 0,
+        REJECTED = 1,
+        RELEASED = 2,
+        MODIFIED_FAILED = 3,
+        MODIFIED_FAILED_UNDELIVERABLE = 4,
+        // Conceptual
+        DELIVERED = 5
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
similarity index 69%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
index 118b5ab..f165cfe 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,23 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 using System.IO;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    internal interface IBytesMessageCloak : IMessageCloak
+    public interface INmsBytesMessageFacade : INmsMessageFacade
     {
-        BinaryReader getDataReader();
-        BinaryWriter getDataWriter();
-
-        new IBytesMessageCloak Copy();
-
-        int BodyLength { get; }
+        BinaryReader GetDataReader();
+        BinaryWriter GetDataWriter();
         void Reset();
+        long BodyLength { get; }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
index acb649a..18b9247 100644
--- a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMapMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,19 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    interface IMapMessageCloak : IMessageCloak
+    public interface INmsMapMessageFacade : INmsMessageFacade
     {
         IPrimitiveMap Map { get; }
-        new IMapMessageCloak Copy();
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
similarity index 51%
rename from src/NMS.AMQP/Message/Cloak/IMessageCloak.cs
rename to src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
index 623449c..e91ef24 100644
--- a/src/NMS.AMQP/Message/Cloak/IMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
@@ -15,47 +15,29 @@
  * limitations under the License.
  */
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
 
-
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    /// <summary>
-    /// Provider specific Cloak Interface from provider implementation.
-    /// </summary>
-    interface IMessageCloak : IMessage
+    public interface INmsMessageFacade
     {
-        byte[] Content
-        {
-            get;
-            set;
-        }
-
-        bool IsBodyReadOnly { get; set; }
-
-        bool IsPropertiesReadOnly { get; set; }
-
-        bool IsReceived { get; }
-
-        IMessageCloak Copy();
-
-        object GetMessageAnnotation(string symbolKey);
-
-        void SetMessageAnnotation(string symbolKey, object value);
-
-        object GetDeliveryAnnotation(string symbolKey);
-
-        void SetDeliveryAnnotation(string symbolKey, object value);
-
+        NmsMessage AsMessage();
+        void ClearBody();
         int DeliveryCount { get; set; }
-
         int RedeliveryCount { get; set; }
-
-        MessageAcknowledgementHandler AckHandler { get; set; }
-        
+        void OnSend(TimeSpan producerTtl);
+        string NMSMessageId { get; set; }
+        IPrimitiveMap Properties { get; }
+        string NMSCorrelationID { get; set; }
+        IDestination NMSDestination { get; set; }
+        TimeSpan NMSTimeToLive { get; set; }
+        MsgDeliveryMode NMSDeliveryMode { get; set; }
+        MsgPriority NMSPriority { get; set; }
+        bool NMSRedelivered { get; set; }
+        IDestination NMSReplyTo { get; set; }
+        DateTime NMSTimestamp { get; set; }
+        string NMSType { get; set; }
+        DateTime Expiration { get; set; }
+        sbyte JmsMsgType { get; }
+        INmsMessageFacade Copy();
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
similarity index 74%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
index b85f947..cea2edc 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsObjectMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,17 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    interface ITextMessageCloak : IMessageCloak
+    public interface INmsObjectMessageFacade : INmsMessageFacade
     {
-        string Text { get; set; }
-        new ITextMessageCloak Copy();
+        object Body { get; set; }        
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
similarity index 82%
copy from src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
index 8775d1b..d7e0523 100644
--- a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsStreamMessageFacade.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,26 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System.IO;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    interface IStreamMessageCloak : IMessageCloak
+    public interface INmsStreamMessageFacade : INmsMessageFacade
     {
-
-
-        new IStreamMessageCloak Copy();
-
-        bool HasNext { get; }
-
-        void Reset();
-
-        void Put(object value);
-
         object Peek();
-
         void Pop();
-
-        
+        void Reset();
+        void Put(object value);
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs b/src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
similarity index 77%
copy from src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
copy to src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
index b85f947..0a1aaa9 100644
--- a/src/NMS.AMQP/Message/Cloak/ITextMessageCloak.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsTextMessageFacade.cs
@@ -14,17 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message.Facade
 {
-    interface ITextMessageCloak : IMessageCloak
+    public interface INmsTextMessageFacade : INmsMessageFacade
     {
         string Text { get; set; }
-        new ITextMessageCloak Copy();
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs b/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs
deleted file mode 100644
index 8b0268d..0000000
--- a/src/NMS.AMQP/Message/Factory/AMQPMessageFactory.cs
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Amqp;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
-    using Cloak;
-    using AMQP;
-    class AMQPMessageFactory<T> : MessageFactory<T> where T : ConnectionInfo
-    {
-
-        protected readonly AMQPMessageTransformation<T> transformFactory;
-        protected AMQPObjectEncodingType encodingType = AMQPObjectEncodingType.UNKOWN;
-
-        internal AMQPMessageFactory(NMSResource<T> resource) : base(resource)
-        {
-            transformFactory = new AMQPMessageTransformation<T>(this);
-            InitEncodingType();
-        }
-
-        internal MessageTransformation TransformFactory { get { return transformFactory; } }
-        
-        internal Connection Parent { get { return parent as Connection; } }
-
-        public override MessageTransformation GetTransformFactory()
-        {
-            return transformFactory;
-        }
-
-        public override IBytesMessage CreateBytesMessage()
-        {
-            IBytesMessageCloak cloak = new AMQPBytesMessageCloak(Parent);
-            return new BytesMessage(cloak);
-        }
-
-        public override IBytesMessage CreateBytesMessage(byte[] body)
-        {
-            IBytesMessage msg = CreateBytesMessage();
-            msg.WriteBytes(body);
-            return msg;
-        }
-
-        public override IMapMessage CreateMapMessage()
-        {
-            IMapMessageCloak cloak = new AMQPMapMessageCloak(Parent);
-            return new MapMessage(cloak);
-        }
-
-        public override IMessage CreateMessage()
-        {
-            IMessageCloak cloak = new AMQPMessageCloak(Parent);
-            return new Message(cloak);
-        }
-
-        public override IObjectMessage CreateObjectMessage(object body)
-        {
-            IObjectMessageCloak cloak = new AMQPObjectMessageCloak(Parent, encodingType);
-            return new ObjectMessage(cloak) { Body=body };
-        }
-
-        public override IStreamMessage CreateStreamMessage()
-        {
-            IStreamMessageCloak cloak = new AMQPStreamMessageCloak(Parent);
-            return new StreamMessage(cloak);
-        }
-
-        public override ITextMessage CreateTextMessage()
-        {
-            ITextMessageCloak cloak = new AMQPTextMessageCloak(Parent);
-            return new TextMessage(cloak);
-        }
-
-        public override ITextMessage CreateTextMessage(string text)
-        {
-            ITextMessage msg = CreateTextMessage();
-            msg.Text = text;
-            return msg;
-        }
-
-        private void InitEncodingType()
-        {
-            encodingType = ConnectionEncodingType(Parent);
-            Tracer.InfoFormat("Message Serialization for connection : {0}, is set to: {1}.", Parent.ClientId, encodingType.ToString());
-        }
-
-
-        private const string AMQP_TYPE = "amqp";
-        private const string DOTNET_TYPE = "dotnet";
-        private const string JAVA_TYPE = "java";
-
-        private static AMQPObjectEncodingType ConnectionEncodingType(Connection connection)
-        {
-            string value = connection.Properties[Connection.MESSAGE_OBJECT_SERIALIZATION_PROP];
-            if (value == null) return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
-            if (value.ToLower().StartsWith(AMQP_TYPE))
-            {
-                return AMQPObjectEncodingType.AMQP_TYPE;
-            }
-            else if (value.ToLower().StartsWith(DOTNET_TYPE))
-            {
-                return AMQPObjectEncodingType.DOTNET_SERIALIZABLE;
-            }
-            else if (value.ToLower().StartsWith(JAVA_TYPE))
-            {
-                return AMQPObjectEncodingType.JAVA_SERIALIZABLE;
-            }
-            else
-            {
-                return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
-            }
-        }
-
-    }
-}
diff --git a/src/NMS.AMQP/Message/Factory/IMessageFactory.cs b/src/NMS.AMQP/Message/Factory/IMessageFactory.cs
deleted file mode 100644
index ed9b2d7..0000000
--- a/src/NMS.AMQP/Message/Factory/IMessageFactory.cs
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
-    interface IMessageFactory 
-    {
-        MessageTransformation GetTransformFactory();
-
-        // Factory methods to create messages
-
-        /// <summary>
-        /// Creates a new message with an empty body
-        /// </summary>
-        IMessage CreateMessage();
-
-        /// <summary>
-        /// Creates a new text message with an empty body
-        /// </summary>
-        ITextMessage CreateTextMessage();
-
-        /// <summary>
-        /// Creates a new text message with the given body
-        /// </summary>
-        ITextMessage CreateTextMessage(string text);
-
-        /// <summary>
-        /// Creates a new Map message which contains primitive key and value pairs
-        /// </summary>
-        IMapMessage CreateMapMessage();
-
-        /// <summary>
-        /// Creates a new Object message containing the given .NET object as the body
-        /// </summary>
-        IObjectMessage CreateObjectMessage(object body);
-
-        /// <summary>
-        /// Creates a new binary message
-        /// </summary>
-        IBytesMessage CreateBytesMessage();
-
-        /// <summary>
-        /// Creates a new binary message with the given body
-        /// </summary>
-        IBytesMessage CreateBytesMessage(byte[] body);
-
-        /// <summary>
-        /// Creates a new stream message
-        /// </summary>
-        IStreamMessage CreateStreamMessage();
-
-        
-    }
-}
diff --git a/src/NMS.AMQP/Message/Factory/MessageFactory.cs b/src/NMS.AMQP/Message/Factory/MessageFactory.cs
deleted file mode 100644
index 49e9b43..0000000
--- a/src/NMS.AMQP/Message/Factory/MessageFactory.cs
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Concurrent;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Message.Factory
-{
-    internal abstract class MessageFactory<T> : IMessageFactory where T : ResourceInfo
-    {
-        private static readonly IDictionary<Id, IMessageFactory> resgistry;
-
-        static MessageFactory()
-        {
-            resgistry = new ConcurrentDictionary<Id, IMessageFactory>();
-        }
-
-        public static void Register(NMSResource<T> resource)
-        {
-            if (resource is Connection)
-            {
-                resgistry.Add(resource.Id, (new AMQPMessageFactory<ConnectionInfo>(resource as Connection)) as IMessageFactory);
-            }
-            else
-            {
-                throw new NMSException("Invalid Message Factory Type " + resource.GetType().FullName);
-            }
-        }
-
-        public static void Unregister(NMSResource<T> resource)
-        {
-            if(resource != null && resource.Id != null)
-            {
-                if(!resgistry.Remove(resource.Id))
-                {
-                    if(resgistry.ContainsKey(resource.Id))
-                        Tracer.WarnFormat("MessageFactory was not able to unregister resource {0}.", resource.Id);
-                }
-            }
-        }
-
-        public static IMessageFactory Instance(Connection resource)
-        {
-            IMessageFactory factory = null;
-            resgistry.TryGetValue(resource.Id, out factory);
-            if(factory == null)
-            {
-                throw new NMSException("Resource "+resource+" is not registered as message factory.");
-            }
-            return factory;
-        }
-        
-        protected readonly NMSResource<T> parent;
-
-        protected  MessageFactory(NMSResource<T> resource)
-        {
-            parent = resource;
-        }
-
-        public abstract MessageTransformation GetTransformFactory();
-        public abstract IMessage CreateMessage();
-        public abstract ITextMessage CreateTextMessage();
-        public abstract ITextMessage CreateTextMessage(string text);
-        public abstract IMapMessage CreateMapMessage();
-        public abstract IObjectMessage CreateObjectMessage(object body);
-        public abstract IBytesMessage CreateBytesMessage();
-        public abstract IBytesMessage CreateBytesMessage(byte[] body);
-        public abstract IStreamMessage CreateStreamMessage();
-        
-    }
-}
diff --git a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs b/src/NMS.AMQP/Message/INmsMessageFactory.cs
similarity index 59%
copy from src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
copy to src/NMS.AMQP/Message/INmsMessageFactory.cs
index 8775d1b..7e48d34 100644
--- a/src/NMS.AMQP/Message/Cloak/IStreamMessageCloak.cs
+++ b/src/NMS.AMQP/Message/INmsMessageFactory.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,26 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System.IO;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+using Apache.NMS.AMQP.Provider.Amqp.Message;
+
+namespace Apache.NMS.AMQP.Message
 {
-    interface IStreamMessageCloak : IMessageCloak
+    public interface INmsMessageFactory
     {
-
-
-        new IStreamMessageCloak Copy();
-
-        bool HasNext { get; }
-
-        void Reset();
-
-        void Put(object value);
-
-        object Peek();
-
-        void Pop();
-
+        NmsMessage CreateMessage();
+        NmsTextMessage CreateTextMessage();
+        NmsTextMessage CreateTextMessage(string payload);
+        NmsStreamMessage CreateStreamMessage();
+        NmsBytesMessage CreateBytesMessage();
+        NmsBytesMessage CreateBytesMessage(byte[] body);
+        NmsMapMessage CreateMapMessage();
+        NmsObjectMessage CreateObjectMessage();
+        NmsObjectMessage CreateObjectMessage(object body);
         
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
similarity index 64%
copy from src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
copy to src/NMS.AMQP/Message/InboundMessageDispatch.cs
index 118b5ab..e069a64 100644
--- a/src/NMS.AMQP/Message/Cloak/IBytesMessageCloak.cs
+++ b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
@@ -14,23 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+namespace Apache.NMS.AMQP.Message
 {
-    internal interface IBytesMessageCloak : IMessageCloak
+    public class InboundMessageDispatch
     {
-        BinaryReader getDataReader();
-        BinaryWriter getDataWriter();
+        public Id ConsumerId { get; set; }
+        public ConsumerInfo ConsumerInfo { get; set; }
+        public NmsMessage Message { get; set; }
+        public bool IsDelivered { get; set; }
 
-        new IBytesMessageCloak Copy();
-
-        int BodyLength { get; }
-        void Reset();
+        public int RedeliveryCount => Message?.Facade.RedeliveryCount ?? 0;
+        public bool EnqueueFirst { get; set; }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/MapMessage.cs b/src/NMS.AMQP/Message/MapMessage.cs
deleted file mode 100644
index 8679e3a..0000000
--- a/src/NMS.AMQP/Message/MapMessage.cs
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Message.Cloak;
-
-namespace Apache.NMS.AMQP.Message
-{
-    /// <summary>
-    /// Apache.NMS.AMQP.Message.MapMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IMapMessage interface.
-    /// Apache.NMS.AMQP.Message.MapMessage uses the Apache.NMS.AMQP.Message.Cloak.IMapMessageCloak interface to detach from the underlying AMQP 1.0 engine.
-    /// </summary>
-    class MapMessage : Message, IMapMessage
-    {
-        new private readonly IMapMessageCloak cloak;
-        private PrimitiveMapInterceptor map;
-
-        internal MapMessage(IMapMessageCloak message) : base(message)
-        {
-            cloak = message;
-        }
-
-        public override bool IsReadOnly
-        {
-            get { return base.IsReadOnly; }
-            internal set
-            {
-                if (map != null)
-                {
-                    map.ReadOnly = value;
-                }
-                base.IsReadOnly = value;
-            }
-        }
-
-        public IPrimitiveMap Body
-        {
-            get
-            {
-                if(map == null)
-                {
-                    map = new PrimitiveMapInterceptor(this, cloak.Map, IsReadOnly, true);
-                }
-                return map;
-            }
-        }
-
-        internal override Message Copy()
-        {
-            return new MapMessage(cloak.Copy());
-        }
-    }
-}
diff --git a/src/NMS.AMQP/Message/Message.cs b/src/NMS.AMQP/Message/Message.cs
deleted file mode 100644
index b2ba218..0000000
--- a/src/NMS.AMQP/Message/Message.cs
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Message
-{
-
-    using Cloak;
-    
-    internal enum AckType
-    {
-        ACCEPTED = 0,
-        REJECTED = 1,
-        RELEASED = 2,
-        MODIFIED_FAILED = 3,
-        MODIFIED_FAILED_UNDELIVERABLE = 4,
-    }
-
-    /// <summary>
-    /// Apache.NMS.AMQP.Message.Message is the root message class that implements the Apache.NMS.IMessage interface.
-    /// Apache.NMS.AMQP.Message.Message uses the Apache.NMS.AMQP.Message.Cloak.IMessageCloak interface to detach from the underlying AMQP 1.0 engine.
-    /// </summary>
-    class Message : IMessage
-    {
-
-        public static readonly string MESSAGE_VENDOR_ACK_PROP = PropertyUtil.CreateProperty("ACK.TYPE", "AMQP");
-
-        protected readonly IMessageCloak cloak;
-        private IPrimitiveMap propertyHelper = null;
-
-        #region Constructors
-
-        internal Message(IMessageCloak message)
-        {
-            this.cloak = message;
-        }
-        
-        #endregion
-        
-        #region Protected Methods
-
-        protected void FailIfReadOnlyMsgBody()
-        {
-            if(IsReadOnly == true)
-            {
-                throw new MessageNotWriteableException("Message is in Read-Only mode.");
-            }
-        }
-
-        protected void FailIfWriteOnlyMsgBody()
-        {
-            if (IsReadOnly == false)
-            {
-                throw new MessageNotReadableException("Message is in Write-Only mode.");
-            }
-        }
-
-        #endregion
-
-        #region Public Properties
-
-        public virtual byte[] Content
-        {
-            get
-            {
-                return cloak.Content;
-            }
-
-            set
-            {
-                cloak.Content = value;
-            }
-        }
-
-        public virtual bool IsReadOnly
-        {
-            get { return cloak.IsBodyReadOnly; }
-            internal set { cloak.IsBodyReadOnly = value; }
-        }
-
-        public virtual bool IsReadOnlyProperties
-        {
-            get { return cloak.IsPropertiesReadOnly; }
-            internal set { cloak.IsPropertiesReadOnly = value; }
-        }
-
-        #endregion
-
-        #region IMessage Properties
-
-
-        public string NMSCorrelationID
-        {
-            get { return cloak.NMSCorrelationID; }
-            set { cloak.NMSCorrelationID = value; }
-        }
-
-        public MsgDeliveryMode NMSDeliveryMode
-        {
-            get { return cloak.NMSDeliveryMode; }
-            set { cloak.NMSDeliveryMode = value; }
-        }
-
-        public IDestination NMSDestination
-        {
-            get { return cloak.NMSDestination; }
-            set { cloak.NMSDestination = value; }
-        }
-
-        public string NMSMessageId
-        {
-            get { return cloak.NMSMessageId; }
-            set { cloak.NMSMessageId = value; }
-        }
-
-        public MsgPriority NMSPriority
-        {
-            get { return cloak.NMSPriority; }
-            set { cloak.NMSPriority = value; }
-        }
-
-        public bool NMSRedelivered
-        {
-            get { return cloak.NMSRedelivered; }
-            set { cloak.NMSRedelivered = value; }
-        }
-
-        public IDestination NMSReplyTo
-        {
-            get { return cloak.NMSReplyTo; }
-            set { cloak.NMSReplyTo = value; }
-        }
-
-        public DateTime NMSTimestamp
-        {
-            get { return cloak.NMSTimestamp; }
-            set { cloak.NMSTimestamp = value; }
-        }
-
-        public TimeSpan NMSTimeToLive
-        {
-            get { return cloak.NMSTimeToLive; }
-            set { cloak.NMSTimeToLive = value; }
-        }
-
-        public string NMSType
-        {
-            get { return cloak.NMSType; }
-            set { cloak.NMSType = value; }
-        }
-
-        public IPrimitiveMap Properties
-        {
-            get
-            {
-                if(propertyHelper == null)
-                {
-                    propertyHelper = new NMSMessagePropertyInterceptor(this, cloak.Properties);
-                }
-                return propertyHelper;
-            }
-        }
-
-        #endregion
-
-        #region IMessage Methods
-
-        public virtual void Acknowledge()
-        {
-            cloak.Acknowledge();
-        }
-
-        public virtual void ClearBody()
-        {
-            cloak.ClearBody();
-        }
-
-        public void ClearProperties()
-        {
-            propertyHelper.Clear();
-        }
-
-        #endregion
-
-        #region Internal Methods
-
-        internal IMessageCloak GetMessageCloak()
-        {
-            return cloak;
-        }
-
-        internal virtual Message Copy()
-        {
-            return new Message(this.cloak.Copy());
-        }
-
-        protected virtual void CopyInto(Message other)
-        {
-
-        }
-
-        #endregion
-
-        public override string ToString()
-        {
-            return base.ToString() + ":\n Impl Type: " + cloak.ToString();
-        }
-
-    }
-
-    internal class MessageAcknowledgementHandler
-    {
-        
-        private MessageConsumer consumer;
-        private Session session;
-        private Message message;
-        
-        private AckType? atype = null;
-
-        public MessageAcknowledgementHandler(MessageConsumer mc, Message msg)
-        {
-            consumer = mc;
-            session = consumer.Session;
-            message = msg;
-            
-        }
-
-        public AckType AcknowledgementType
-        {
-            get
-            {
-                return atype ?? MessageSupport.DEFAULT_ACK_TYPE;
-            }
-            set
-            {
-                atype = value;
-            }
-        }
-
-        public void ClearAckType()
-        {
-            atype = null;
-        }
-
-        public bool IsAckTypeSet
-        {
-            get => atype != null;
-        }
-
-        public void Acknowledge()
-        {
-            
-            if (session.AcknowledgementMode.Equals(AcknowledgementMode.IndividualAcknowledge))
-            {
-                consumer.AcknowledgeMessage(message, AcknowledgementType);
-            }
-            else // Session Ackmode  AcknowledgementMode.ClientAcknowledge
-            {
-                session.Acknowledge(AcknowledgementType);
-            }
-        }
-
-    }
-
-}
diff --git a/src/NMS.AMQP/Message/BytesMessage.cs b/src/NMS.AMQP/Message/NmsBytesMessage.cs
similarity index 52%
rename from src/NMS.AMQP/Message/BytesMessage.cs
rename to src/NMS.AMQP/Message/NmsBytesMessage.cs
index 8186214..62eaa5b 100644
--- a/src/NMS.AMQP/Message/BytesMessage.cs
+++ b/src/NMS.AMQP/Message/NmsBytesMessage.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,136 +14,69 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 using System.IO;
-using Apache.NMS;
+using Apache.NMS.AMQP.Message.Facade;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.AMQP.Message
 {
-    using Cloak;
-    /// <summary>
-    /// Apache.NMS.AMQP.Message.BytesMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IBytesMessage interface.
-    /// Apache.NMS.AMQP.Message.BytesMessage uses the Apache.NMS.AMQP.Message.Cloak.IBytesMessageCloak interface to detach from the underlying AMQP 1.0 engine.
-    /// </summary>
-    class BytesMessage : Message, IBytesMessage
+    public class NmsBytesMessage : NmsMessage, IBytesMessage
     {
         private BinaryWriter dataOut = null;
         private BinaryReader dataIn = null;
-        private MemoryStream outputBuffer = null;
-        private readonly new IBytesMessageCloak cloak;
-
-        #region Constructor
-
-        internal BytesMessage(IBytesMessageCloak message) : base(message)
-        {
-            cloak = message;
-        }
-
-        #endregion
-
-        internal override Message Copy()
-        {
-            return new BytesMessage(this.cloak.Copy());
-        }
+        private readonly INmsBytesMessageFacade facade;
 
-        #region Private Methods
-
-        private void InitializeReadingMode()
+        public NmsBytesMessage(INmsBytesMessageFacade facade) : base(facade)
         {
-            FailIfWriteOnlyMsgBody();
-            if(dataIn == null || dataIn.BaseStream == null)
-            {
-                dataIn = cloak.getDataReader();
-            }
+            this.facade = facade;
         }
 
-        private void InitializeWritingMode()
+        public byte[] Content
         {
-            FailIfReadOnlyMsgBody();
-            if(dataOut == null )
+            get
             {
-                dataOut = cloak.getDataWriter();
+                byte[] buffer = new byte [BodyLength];
+                ReadBytes(buffer);
+                return buffer;
             }
+            set => WriteBytes(value);
         }
 
-        private void StoreContent()
+        public byte ReadByte()
         {
-            if(dataOut != null)
+            InitializeReading();
+            try
             {
-                dataOut.Close();
-                base.Content = outputBuffer.ToArray();
-
-                dataOut = null;
-                outputBuffer = null;
+                return dataIn.ReadByte();
             }
-        }
-
-        #endregion
-
-        #region IBytesMessage Properties
-
-        public override byte[] Content
-        {
-            get
+            catch (EndOfStreamException e)
             {
-                byte[] buffer = null;
-                InitializeReadingMode();
-                if(this.cloak.BodyLength != 0)
-                {
-                    buffer = new byte[this.cloak.BodyLength];
-                    dataIn.Read(buffer, 0, buffer.Length);
-                }
-                return buffer;
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
             }
-            set
+            catch (IOException e)
             {
-                InitializeWritingMode();
-                if(value != null)
-                {
-                    this.dataOut.Write(value, 0, value.Length);
-                }    
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public long BodyLength
+        public void WriteByte(byte value)
         {
-            get
+            InitializeWriting();
+            try
             {
-                InitializeReadingMode();
-                return this.cloak.BodyLength;
+                this.dataOut.Write(value);
+            }
+            catch (IOException e)
+            {
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
-        }
-
-        #endregion
-
-        #region IBytesMessage Methods
-
-        public override void ClearBody()
-        {
-            dataIn = null;
-            dataOut = null;
-            outputBuffer = null;
-            IsReadOnly = false;
-            base.ClearBody();
-        }
-
-        public void Reset()
-        {
-            dataIn = null;
-            dataOut = null;
-            outputBuffer = null;
-            this.cloak.Reset();
-            IsReadOnly = true;
         }
 
         public bool ReadBoolean()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
                 return dataIn.ReadBoolean();
@@ -158,29 +91,25 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public byte ReadByte()
+        public void WriteBoolean(bool value)
         {
-            InitializeReadingMode();
+            InitializeWriting();
             try
             {
-                return dataIn.ReadByte();
-            }
-            catch (EndOfStreamException e)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(e);
+                this.dataOut.Write(value);
             }
             catch (IOException e)
             {
-                throw NMSExceptionSupport.CreateMessageFormatException(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public int ReadBytes(byte[] value)
+        public char ReadChar()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
-                return dataIn.Read(value, 0, value.Length);
+                return dataIn.ReadChar();
             }
             catch (EndOfStreamException e)
             {
@@ -192,29 +121,25 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public int ReadBytes(byte[] value, int length)
+        public void WriteChar(char value)
         {
-            InitializeReadingMode();
+            InitializeWriting();
             try
             {
-                return dataIn.Read(value, 0, length);
-            }
-            catch (EndOfStreamException e)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(e);
+                this.dataOut.Write(value);
             }
             catch (IOException e)
             {
-                throw NMSExceptionSupport.CreateMessageFormatException(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public char ReadChar()
+        public short ReadInt16()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
-                return dataIn.ReadChar();
+                return dataIn.ReadInt16();
             }
             catch (EndOfStreamException e)
             {
@@ -226,29 +151,25 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public double ReadDouble()
+        public void WriteInt16(short value)
         {
-            InitializeReadingMode();
+            InitializeWriting();
             try
             {
-                return dataIn.ReadDouble();
-            }
-            catch (EndOfStreamException e)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(e);
+                this.dataOut.Write(value);
             }
             catch (IOException e)
             {
-                throw NMSExceptionSupport.CreateMessageFormatException(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public short ReadInt16()
+        public int ReadInt32()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
-                return dataIn.ReadInt16();
+                return dataIn.ReadInt32();
             }
             catch (EndOfStreamException e)
             {
@@ -260,26 +181,22 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public int ReadInt32()
+        public void WriteInt32(int value)
         {
-            InitializeReadingMode();
+            InitializeWriting();
             try
             {
-                return dataIn.ReadInt32();
-            }
-            catch (EndOfStreamException e)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(e);
+                this.dataOut.Write(value);
             }
             catch (IOException e)
             {
-                throw NMSExceptionSupport.CreateMessageFormatException(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
         public long ReadInt64()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
                 return dataIn.ReadInt64();
@@ -294,30 +211,25 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public float ReadSingle()
+        public void WriteInt64(long value)
         {
-            InitializeReadingMode();
+            InitializeWriting();
             try
             {
-                return dataIn.ReadSingle();
-            }
-            catch (EndOfStreamException e)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(e);
+                this.dataOut.Write(value);
             }
             catch (IOException e)
             {
-                throw NMSExceptionSupport.CreateMessageFormatException(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public string ReadString()
+        public float ReadSingle()
         {
-            InitializeReadingMode();
+            InitializeReading();
             try
             {
-                // Note if dataIn is an EndianBinaryReader the string length is read as 16bit short
-                return dataIn.ReadString();
+                return dataIn.ReadSingle();
             }
             catch (EndOfStreamException e)
             {
@@ -328,92 +240,90 @@ namespace Apache.NMS.AMQP.Message
                 throw NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
-        
-        public void WriteBoolean(bool value)
+
+        public void WriteSingle(float value)
         {
-            InitializeWritingMode();
+            InitializeWriting();
             try
             {
-                dataOut.Write(value);
+                this.dataOut.Write(value);
             }
-            catch (Exception e)
+            catch (IOException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public void WriteByte(byte value)
+        public double ReadDouble()
         {
-            InitializeWritingMode();
+            InitializeReading();
             try
             {
-                dataOut.Write(value);
+                return dataIn.ReadDouble();
             }
-            catch(Exception e)
+            catch (EndOfStreamException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
             }
-
         }
 
-        public void WriteBytes(byte[] value)
+        public void WriteDouble(double value)
         {
-            InitializeWritingMode();
+            InitializeWriting();
             try
             {
-                dataOut.Write(value, 0, value.Length);
+                this.dataOut.Write(value);
             }
-            catch (Exception e)
+            catch (IOException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public void WriteBytes(byte[] value, int offset, int length)
+        public int ReadBytes(byte[] value)
         {
-            InitializeWritingMode();
-            try
-            {
-                dataOut.Write(value, offset, length);
-            }
-            catch (Exception e)
-            {
-                throw NMSExceptionSupport.Create(e);
-            }
+            return ReadBytes(value, value.Length);
         }
 
-        public void WriteChar(char value)
+        public int ReadBytes(byte[] value, int length)
         {
-            InitializeWritingMode();
-            try
+            InitializeReading();
+
+            if (length < 0 || value.Length < length)
             {
-                dataOut.Write(value);
+                throw new IndexOutOfRangeException("length must not be negative or larger than the size of the provided array");
             }
-            catch (Exception e)
-            {
-                throw NMSExceptionSupport.Create(e);
-            }
-        }
 
-        public void WriteDouble(double value)
-        {
-            InitializeWritingMode();
             try
             {
-                dataOut.Write(value);
+                return dataIn.Read(value, 0, length);
             }
-            catch (Exception e)
+            catch (EndOfStreamException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public void WriteInt16(short value)
+        public void WriteBytes(byte[] value)
+        {
+            WriteBytes(value, 0, value.Length);
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
         {
-            InitializeWritingMode();
+            InitializeWriting();
+
             try
             {
-                dataOut.Write(value);
+                dataOut.Write(value, offset, length);
             }
             catch (Exception e)
             {
@@ -421,118 +331,123 @@ namespace Apache.NMS.AMQP.Message
             }
         }
 
-        public void WriteInt32(int value)
+        public string ReadString()
         {
-            InitializeWritingMode();
+            InitializeReading();
             try
             {
-                dataOut.Write(value);
+                return dataIn.ReadString();
             }
-            catch (Exception e)
+            catch (EndOfStreamException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
-        public void WriteInt64(long value)
+        public void WriteString(string value)
         {
-            InitializeWritingMode();
+            InitializeWriting();
             try
             {
-                dataOut.Write(value);
+                this.dataOut.Write(value);
             }
-            catch (Exception e)
+            catch (IOException e)
             {
-                throw NMSExceptionSupport.Create(e);
+                NMSExceptionSupport.CreateMessageFormatException(e);
             }
         }
 
         public void WriteObject(object value)
         {
-            InitializeWritingMode();
-            
-            Type objType = value.GetType();
-            if(value is byte[])
-            {
-                dataOut.Write((byte[])value);
-            }
-            else if (objType.IsPrimitive)
-            {
-                if(value is Byte)
-                {
-                    dataOut.Write((byte)value);
-                }
-                else if (value is Char)
-                {
-                    dataOut.Write((char)value);
-                }
-                else if (value is Boolean)
-                {
-                    dataOut.Write((bool)value);
-                }
-                else if (value is Int16)
-                {
-                    dataOut.Write((short)value);
-                }
-                else if (value is Int32)
-                {
-                    dataOut.Write((int)value);
-                }
-                else if (value is Int64)
-                {
-                    dataOut.Write((long)value);
-                }
-                else if (value is Single)
-                {
-                    dataOut.Write((float)value);
-                }
-                else if (value is Double)
-                {
-                    dataOut.Write((double)value);
-                }
-                else if (value is String)
-                {
-                    dataOut.Write((string) value);
-                }
-                else
-                {
-                    throw new MessageFormatException("Cannot write primitive type:" + objType);
-                }
-            }
+            if (value == null)
+                throw new ArgumentNullException(nameof(value));
+
+            if (value is byte byteValue)
+                WriteByte(byteValue);
+            else if (value is char charValue)
+                WriteChar(charValue);
+            else if (value is bool boolValue)
+                WriteBoolean(boolValue);
+            else if (value is short shortValue)
+                WriteInt16(shortValue);
+            else if (value is int intValue)
+                WriteInt32(intValue);
+            else if (value is long longValue)
+                WriteInt64(longValue);
+            else if (value is float floatValue)
+                WriteSingle(floatValue);
+            else if (value is double doubleValue)
+                WriteDouble(doubleValue);
+            else if (value is string stringValue)
+                WriteString(stringValue);
+            else if (value is byte[] bytes)
+                WriteBytes(bytes);
             else
-            {
-                throw new MessageFormatException("Cannot write non-primitive type:" + objType);
-            }
-            
+                throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType().FullName);
         }
 
-        public void WriteSingle(float value)
+        public override void ClearBody()
         {
-            InitializeWritingMode();
-            try
-            {
-                dataOut.Write(value);
-            }
-            catch (Exception e)
+            base.ClearBody();
+            this.dataIn = null;
+            this.dataOut = null;
+        }
+
+        public override void OnSend(TimeSpan producerTtl)
+        {
+            Reset();
+            base.OnSend(producerTtl);
+        }
+
+        public void Reset()
+        {
+            this.facade.Reset();
+            this.dataOut = null;
+            this.dataIn = null;
+            IsReadOnlyBody = true;
+        }
+
+        public long BodyLength
+        {
+            get
             {
-                throw NMSExceptionSupport.Create(e);
+                InitializeReading();
+                return facade.BodyLength;
             }
         }
 
-        public void WriteString(string value)
+        public override string ToString()
         {
-            InitializeWritingMode();
-            try
+            return $"NmsBytesMessage {{ {Facade} }}";
+        }
+
+        private void InitializeWriting()
+        {
+            CheckReadOnlyBody();
+            if (dataOut == null)
             {
-                // note if dataOut is an EndianBinaryWriter, strings are written with a 16bit short length.
-                dataOut.Write(value);
+                dataOut = facade.GetDataWriter();
             }
-            catch (Exception e)
+        }
+
+        private void InitializeReading()
+        {
+            CheckWriteOnlyBody();
+            if (dataIn?.BaseStream == null)
             {
-                throw NMSExceptionSupport.Create(e);
+                dataIn = facade.GetDataReader();
             }
         }
 
-        #endregion
+        public override NmsMessage Copy()
+        {
+            NmsBytesMessage copy = new NmsBytesMessage(facade.Copy() as INmsBytesMessageFacade);
+            CopyInto(copy);
+            return copy;
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Util/AtomicSequence.cs b/src/NMS.AMQP/Message/NmsMapMessage.cs
similarity index 52%
copy from src/NMS.AMQP/Util/AtomicSequence.cs
copy to src/NMS.AMQP/Message/NmsMapMessage.cs
index fd59a94..0d74176 100644
--- a/src/NMS.AMQP/Util/AtomicSequence.cs
+++ b/src/NMS.AMQP/Message/NmsMapMessage.cs
@@ -14,42 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+
+using Apache.NMS.AMQP.Message.Facade;
 using Apache.NMS.Util;
 
-namespace Apache.NMS.AMQP.Util
+namespace Apache.NMS.AMQP.Message
 {
-    /// <summary>
-    /// Simple utility class used mainly for Id generation.
-    /// </summary>
-    class AtomicSequence : Atomic<ulong>
+    public class NmsMapMessage : NmsMessage, IMapMessage
     {
-        public AtomicSequence() : base()
-        {
-        }
+        private readonly INmsMapMessageFacade facade;
+        private PrimitiveMapInterceptor map;
 
-        public AtomicSequence(ulong defaultValue) : base(defaultValue)
+        public NmsMapMessage(INmsMapMessageFacade facade) : base(facade)
         {
+            this.facade = facade;
         }
 
-        public ulong getAndIncrement()
+        public override bool IsReadOnly
         {
-            ulong val = 0;
-            lock (this)
+            get => base.IsReadOnly;
+            set
             {
-                val = atomicValue;
-                atomicValue++;
+                if (map != null) 
+                    map.ReadOnly = value;
+
+                base.IsReadOnly = value;
             }
-            return val;
         }
 
+        public IPrimitiveMap Body => map ?? (map = new PrimitiveMapInterceptor(this, facade.Map, IsReadOnly, true));
+
         public override string ToString()
         {
-            return Value.ToString();
+            return $"NmsMapMessage {{ {Facade} }}";
+        }
+        
+        public override NmsMessage Copy()
+        {
+            NmsMapMessage copy = new NmsMapMessage(facade.Copy() as INmsMapMessageFacade);
+            CopyInto(copy);
+            return copy;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessage.cs b/src/NMS.AMQP/Message/NmsMessage.cs
new file mode 100644
index 0000000..1f81717
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsMessage.cs
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Message.Facade;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+    public class NmsMessage : IMessage
+    {
+        private MessagePropertyIntercepter properties;
+        private bool readOnlyProperties;
+
+        public NmsMessage(INmsMessageFacade facade)
+        {
+            Facade = facade;
+        }
+
+        public INmsMessageFacade Facade { get; }
+
+        public IPrimitiveMap Properties => properties ?? (properties = new MessagePropertyIntercepter(this, Facade.Properties, IsReadOnlyProperties));
+
+        public string NMSCorrelationID
+        {
+            get => Facade.NMSCorrelationID;
+            set => Facade.NMSCorrelationID = value;
+        }
+
+        public IDestination NMSDestination
+        {
+            get => Facade.NMSDestination;
+            set => Facade.NMSDestination = value;
+        }
+
+        public TimeSpan NMSTimeToLive
+        {
+            get => Facade.NMSTimeToLive;
+            set => Facade.NMSTimeToLive = value;
+        }
+
+        public string NMSMessageId
+        {
+            get => Facade.NMSMessageId;
+            set => Facade.NMSMessageId = value;
+        }
+
+        public MsgDeliveryMode NMSDeliveryMode
+        {
+            get => Facade.NMSDeliveryMode;
+            set => Facade.NMSDeliveryMode = value;
+        }
+
+        public MsgPriority NMSPriority
+        {
+            get => Facade.NMSPriority;
+            set => Facade.NMSPriority = value;
+        }
+
+        public bool NMSRedelivered
+        {
+            get => Facade.NMSRedelivered;
+            set => Facade.NMSRedelivered = value;
+        }
+
+        public IDestination NMSReplyTo
+        {
+            get => Facade.NMSReplyTo;
+            set => Facade.NMSReplyTo = value;
+        }
+
+        public DateTime NMSTimestamp
+        {
+            get => Facade.NMSTimestamp;
+            set => Facade.NMSTimestamp = value;
+        }
+
+        public string NMSType
+        {
+            get => Facade.NMSType;
+            set => Facade.NMSType = value;
+        }
+
+        public NmsAcknowledgeCallback NmsAcknowledgeCallback { get; set; }
+
+        public virtual bool IsReadOnly { get; set; }
+
+        public bool IsReadOnlyBody { get; set; }
+
+        public bool IsReadOnlyProperties
+        {
+            get => readOnlyProperties;
+            set
+            {
+                readOnlyProperties = value;
+                if (properties != null)
+                {
+                    properties.ReadOnly = value;
+                }
+            }
+        }
+
+        public void Acknowledge()
+        {
+            if (NmsAcknowledgeCallback != null)
+            {
+                try
+                {
+                    NmsAcknowledgeCallback.Acknowledge();
+                    NmsAcknowledgeCallback = null;
+                }
+                catch (Exception e)
+                {
+                    throw NMSExceptionSupport.Create(e);
+                }
+            }
+        }
+
+        public virtual void ClearBody()
+        {
+            CheckReadOnly();
+            IsReadOnlyBody = false;
+            Facade.ClearBody();
+        }
+
+        public void ClearProperties()
+        {
+            CheckReadOnly();
+            IsReadOnlyProperties = false;
+            Properties.Clear();
+        }
+
+        public virtual void OnSend(TimeSpan producerTtl)
+        {
+            IsReadOnly = true;
+            Facade.OnSend(producerTtl);
+        }
+
+        public void OnDispatch()
+        {
+            IsReadOnly = false;
+            IsReadOnlyBody = true;
+            IsReadOnlyProperties = true;
+        }
+
+        public override string ToString()
+        {
+            return $"NmsMessage {{ {Facade} }}";
+        }
+
+        protected bool Equals(NmsMessage other)
+        {
+            if (other.NMSMessageId == null)
+                return false;
+
+            return string.Equals(NMSMessageId, other.NMSMessageId);
+        }
+
+        public override bool Equals(object obj)
+        {
+            if (ReferenceEquals(null, obj)) return false;
+            if (ReferenceEquals(this, obj)) return true;
+            if (obj.GetType() != this.GetType()) return false;
+            return Equals((NmsMessage) obj);
+        }
+
+        public override int GetHashCode()
+        {
+            return (NMSMessageId != null ? NMSMessageId.GetHashCode() : base.GetHashCode());
+        }
+
+        //----- State validation methods -----------------------------------------//
+
+        protected void CheckReadOnly()
+        {
+            if (IsReadOnly)
+            {
+                throw new MessageNotReadableException("Message is currently read-only");
+            }
+        }
+
+        protected void CheckWriteOnlyBody()
+        {
+            if (!IsReadOnlyBody)
+            {
+                throw new MessageNotReadableException("Message body is write-only");
+            }
+        }
+
+        protected void CheckReadOnlyBody()
+        {
+            if (IsReadOnly || IsReadOnlyBody)
+            {
+                throw new MessageNotWriteableException("Message body is read-only");
+            }
+        }
+
+        public bool IsExpired()
+        {
+            DateTime expireTime = Facade.Expiration;
+            return expireTime > DateTime.MinValue && DateTime.UtcNow > expireTime;
+        }
+
+        public virtual NmsMessage Copy()
+        {
+            NmsMessage copy = new NmsMessage(Facade.Copy());
+            CopyInto(copy);
+            return copy;
+        }
+
+        protected void CopyInto(NmsMessage target)
+        {
+            target.IsReadOnly = IsReadOnly;
+            target.IsReadOnlyBody = IsReadOnlyBody;
+            target.IsReadOnlyProperties = IsReadOnlyProperties;
+            target.NmsAcknowledgeCallback = NmsAcknowledgeCallback;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsMessageTransformation.cs b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
new file mode 100644
index 0000000..ef1acb1
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsMessageTransformation.cs
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections;
+
+namespace Apache.NMS.AMQP.Message
+{
+    public static class NmsMessageTransformation
+    {
+        public static NmsMessage TransformMessage(INmsMessageFactory factory, IMessage message)
+        {
+            NmsMessage nmsMessage = null;
+
+            if (message is IBytesMessage bytesMessage)
+            {
+                bytesMessage.Reset();
+                NmsBytesMessage msg = factory.CreateBytesMessage();
+
+                try
+                {
+                    while (true)
+                    {
+                        // Reads a byte from the message stream until the stream is empty
+                        msg.WriteByte(bytesMessage.ReadByte());
+                    }
+                }
+                catch (MessageEOFException)
+                {
+                    // Indicates all the bytes have been read from the source.
+                }
+
+                nmsMessage = msg;
+            }
+            else if (message is IMapMessage mapMessage)
+            {
+                NmsMapMessage msg = factory.CreateMapMessage();
+                CopyMap(mapMessage.Body, msg.Body);
+                nmsMessage = msg;
+            }
+            else if (message is IObjectMessage objectMessage)
+            {
+                NmsObjectMessage msg = factory.CreateObjectMessage();
+                msg.Body = objectMessage.Body;
+                nmsMessage = msg;
+            }
+            else if (message is IStreamMessage streamMessage)
+            {
+                streamMessage.Reset();
+                NmsStreamMessage msg = factory.CreateStreamMessage();
+                
+                try
+                {
+                    while (true)
+                    {
+                        // Reads a byte from the message stream until the stream is empty
+                        msg.WriteObject(streamMessage.ReadObject());
+                    }
+                }
+                catch (MessageEOFException)
+                {
+                    // Indicates all the stream have been read from the source.
+                }
+
+                nmsMessage = msg;
+            }
+            else if (message is ITextMessage textMessage)
+            {
+                NmsTextMessage msg = factory.CreateTextMessage();
+                msg.Text = textMessage.Text;
+                nmsMessage = msg;
+            }
+            else
+                nmsMessage = factory.CreateMessage();
+
+
+            CopyProperties(message, nmsMessage);
+            return nmsMessage;
+        }
+
+        private static void CopyProperties(IMessage source, NmsMessage target)
+        {
+            target.NMSMessageId = source.NMSMessageId;
+            target.NMSCorrelationID = source.NMSCorrelationID;
+            target.NMSDestination = source.NMSDestination;
+            target.NMSReplyTo = source.NMSReplyTo;
+            target.NMSDeliveryMode = source.NMSDeliveryMode;
+            target.NMSRedelivered = source.NMSRedelivered;
+            target.NMSType = source.NMSType;
+            target.NMSPriority = source.NMSPriority;
+            target.NMSTimestamp = source.NMSTimestamp;
+            
+            CopyMap(source.Properties, target.Properties);
+        }
+
+        private static void CopyMap(IPrimitiveMap source, IPrimitiveMap target)
+        {
+            foreach (object key in source.Keys)
+            {
+                string name = key.ToString();
+                object value = source[name];
+
+                switch (value)
+                {
+                    case bool boolValue:
+                        target.SetBool(name, boolValue);
+                        break;
+                    case char charValue:
+                        target.SetChar(name, charValue);
+                        break;
+                    case string stringValue:
+                        target.SetString(name, stringValue);
+                        break;
+                    case byte byteValue:
+                        target.SetByte(name, byteValue);
+                        break;
+                    case short shortValue:
+                        target.SetShort(name, shortValue);
+                        break;
+                    case int intValue:
+                        target.SetInt(name, intValue);
+                        break;
+                    case long longValue:
+                        target.SetLong(name, longValue);
+                        break;
+                    case float floatValue:
+                        target.SetFloat(name, floatValue);
+                        break;
+                    case double doubleValue:
+                        target.SetDouble(name, doubleValue);
+                        break;
+                    case byte[] bytesValue:
+                        target.SetBytes(name, bytesValue);
+                        break;
+                    case IList listValue:
+                        target.SetList(name, listValue);
+                        break;
+                    case IDictionary dictionaryValue:
+                        target.SetDictionary(name, dictionaryValue);
+                        break;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/ObjectMessage.cs b/src/NMS.AMQP/Message/NmsObjectMessage.cs
similarity index 55%
copy from src/NMS.AMQP/Message/ObjectMessage.cs
copy to src/NMS.AMQP/Message/NmsObjectMessage.cs
index d2a7054..556f9c2 100644
--- a/src/NMS.AMQP/Message/ObjectMessage.cs
+++ b/src/NMS.AMQP/Message/NmsObjectMessage.cs
@@ -14,60 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
-using System.Runtime.Serialization;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
+using Apache.NMS.AMQP.Message.Facade;
 
 namespace Apache.NMS.AMQP.Message
 {
-    using Cloak;
-    class ObjectMessage : Message, IObjectMessage
+    public class NmsObjectMessage : NmsMessage, IObjectMessage
     {
-        protected new readonly IObjectMessageCloak cloak;
-        internal ObjectMessage(IObjectMessageCloak message) : base(message)
-        {
-            this.cloak = message;
-        }
+        private readonly INmsObjectMessageFacade facade;
 
-        public new byte[] Content
+        public NmsObjectMessage(INmsObjectMessageFacade facade) : base(facade)
         {
-            get
-            {
-                return cloak.Content;
-            }
-
-            set
-            {
-                
-            }
+            this.facade = facade;
         }
 
         public object Body
         {
-            get
-            {
-                return cloak.Body;
-            }
+            get => this.facade.Body;
             set
             {
-                
+                CheckReadOnlyBody();
                 try
                 {
-                    cloak.Body = value;
+                    this.facade.Body = value;
                 }
-                catch (SerializationException se)
+                catch (Exception e)
                 {
-                    throw NMSExceptionSupport.CreateMessageFormatException(se);
+                    throw new MessageFormatException("Failed to serialize object", e);
                 }
-                
             }
         }
-
-        internal override Message Copy()
+        
+        public override string ToString()
+        {
+            return $"NmsObjectMessage {{ {Facade} }}";
+        }
+        
+        public override NmsMessage Copy()
         {
-            return new ObjectMessage(this.cloak.Copy());
+            NmsObjectMessage copy = new NmsObjectMessage(facade.Copy() as INmsObjectMessageFacade);
+            CopyInto(copy);
+            return copy;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/NmsStreamMessage.cs b/src/NMS.AMQP/Message/NmsStreamMessage.cs
new file mode 100644
index 0000000..03d6150
--- /dev/null
+++ b/src/NMS.AMQP/Message/NmsStreamMessage.cs
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Globalization;
+using System.Runtime.InteropServices.ComTypes;
+using Apache.NMS.AMQP.Message.Facade;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+    public class NmsStreamMessage : NmsMessage, IStreamMessage
+    {
+        private const int NO_BYTES_IN_FLIGHT = -1;
+        private readonly INmsStreamMessageFacade facade;
+
+        private int remainingBytes = NO_BYTES_IN_FLIGHT;
+        private byte[] bytes;
+
+        public NmsStreamMessage(INmsStreamMessageFacade facade) : base(facade)
+        {
+            this.facade = facade;
+        }
+
+        public override string ToString()
+        {
+            return "NmsStreamMessage { " + facade + " }";
+        }
+
+        public bool ReadBoolean()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            bool result;
+            object value = facade.Peek();
+
+            if (value is bool boolResult)
+                result = boolResult;
+            else if (value is string stringValue)
+                result = stringValue.Equals(bool.TrueString);
+            else if (value is null)
+                result = false;
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to boolean.");
+
+            facade.Pop();
+            return result;
+        }
+
+        private void CheckBytesInFlight()
+        {
+            if (remainingBytes != NO_BYTES_IN_FLIGHT)
+            {
+                throw new MessageFormatException("Partially read byte[] entry still being retrieved using readBytes(byte[] dest)");
+            }
+        }
+
+        public byte ReadByte()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            byte result;
+            object value = facade.Peek();
+            if (value is byte byteValue)
+                result = byteValue;
+            else if (value is string stringValue && byte.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to byte.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to byte.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public int ReadBytes(byte[] value)
+        {
+            CheckWriteOnlyBody();
+
+            if (value == null)
+            {
+                throw new ArgumentNullException(nameof(value), "Target byte array was null");
+            }
+
+            if (remainingBytes == NO_BYTES_IN_FLIGHT)
+            {
+                object data = facade.Peek();
+                if (data == null)
+                {
+                    return -1;
+                }
+                else if (!(data is byte[]))
+                {
+                    throw new MessageFormatException("Next stream value is not a byte array.");
+                }
+                bytes = data as byte[];
+                remainingBytes = bytes.Length;
+            }
+            else if (remainingBytes == 0)
+            {
+                // We previously read all the bytes, but must have filled the destination array.
+                remainingBytes = NO_BYTES_IN_FLIGHT;
+                bytes = null;
+                facade.Pop();
+                return -1;
+            }
+
+            int previouslyRead = bytes.Length - remainingBytes;
+            int lengthToCopy = Math.Min(value.Length, remainingBytes);
+
+            if (lengthToCopy > 0)
+            {
+                Array.Copy(bytes, previouslyRead, value, 0, lengthToCopy);
+            }
+
+            remainingBytes -= lengthToCopy;
+
+            if (remainingBytes == 0 && lengthToCopy < value.Length)
+            {
+                // All bytes have been read and the destination array was not filled on this
+                // call, so the return will enable the caller to determine completion immediately.
+                remainingBytes = NO_BYTES_IN_FLIGHT;
+                bytes = null;
+                facade.Pop();
+            }
+
+            return lengthToCopy;
+        }
+
+        public char ReadChar()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            char result;
+            object value = facade.Peek();
+            if (value is char charValue)
+                result = charValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to char.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to char.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public short ReadInt16()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            short result;
+            object value = facade.Peek();
+            if (value is short shortValue)
+                result = shortValue;
+            else if (value is byte byteValue)
+                result = byteValue;
+            else if (value is string stringValue && short.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to short.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to short.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public int ReadInt32()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            int result;
+            object value = facade.Peek();
+            if (value is int intValue)
+                result = intValue;
+            else if (value is short shortValue)
+                result = shortValue;
+            else if (value is byte byteValue)
+                result = byteValue;
+            else if (value is string stringValue && int.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to int.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public long ReadInt64()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            long result;
+            object value = facade.Peek();
+            if (value is long longValue)
+                result = longValue;
+            else if (value is int intValue)
+                result = intValue;
+            else if (value is short shortValue)
+                result = shortValue;
+            else if (value is byte byteValue)
+                result = byteValue;
+            else if (value is string stringValue && long.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to long.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to long.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public float ReadSingle()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            float result;
+            object value = facade.Peek();
+            if (value is float floatValue)
+                result = floatValue;
+            else if (value is string stringValue && float.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to float.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to float.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public double ReadDouble()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            double result;
+            object value = facade.Peek();
+            if (value is double doubleValue)
+                result = doubleValue;
+            else if (value is float floatValue)
+                result = floatValue;
+            else if (value is string stringValue && float.TryParse(stringValue, out var parsedValue))
+                result = parsedValue;
+            else if (value is null)
+                throw new NullReferenceException("Cannot convert null value to long.");
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to double.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public string ReadString()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            string result;
+            object value = facade.Peek();
+            if (value == null)
+                result = null;
+            else if (value is string stringValue)
+                result = stringValue;
+            else if (value is float floatValue)
+                result = floatValue.ToString(CultureInfo.InvariantCulture);
+            else if (value is double doubleValue)
+                result = doubleValue.ToString(CultureInfo.InvariantCulture);
+            else if (value is long longValue)
+                result = longValue.ToString();
+            else if (value is int intValue)
+                result = intValue.ToString();
+            else if (value is short shortValue)
+                result = shortValue.ToString();
+            else if (value is byte byteValue)
+                result = byteValue.ToString();
+            else if (value is bool boolValue)
+                result = boolValue.ToString();
+            else if (value is char charValue)
+                result = charValue.ToString();
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public object ReadObject()
+        {
+            CheckWriteOnlyBody();
+            CheckBytesInFlight();
+
+            object result;
+            object value = facade.Peek();
+            if (value == null)
+                result = null;
+            else if (value is string)
+                result = value;
+            else if (value is float)
+                result = value;
+            else if (value is double)
+                result = value;
+            else if (value is long)
+                result = value;
+            else if (value is int)
+                result = value;
+            else if (value is short)
+                result = value;
+            else if (value is byte)
+                result = value;
+            else if (value is bool)
+                result = value;
+            else if (value is char)
+                result = value;
+            else if (value is byte[] original)
+            {
+                byte[] bytesResult = new byte[original.Length];
+                Array.Copy(original, 0, bytesResult, 0, original.Length);
+                result = bytesResult;
+            }
+            else
+                throw new MessageFormatException("stream value: " + value.GetType().Name + " cannot be converted to int.");
+
+            facade.Pop();
+            return result;
+        }
+
+        public void WriteBoolean(bool value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteByte(byte value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteBytes(byte[] value)
+        {
+            WriteBytes(value, 0, value.Length);
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
+        {
+            CheckReadOnlyBody();
+
+            byte[] entry = new byte[length];
+            Array.Copy(value, offset, entry, 0, length);
+            facade.Put(entry);
+        }
+
+        public void WriteChar(char value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteInt16(short value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteInt32(int value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteInt64(long value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteSingle(float value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteDouble(double value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteString(string value)
+        {
+            CheckReadOnlyBody();
+            facade.Put(value);
+        }
+
+        public void WriteObject(object value)
+        {
+            CheckReadOnlyBody();
+
+            switch (value)
+            {
+                case null:
+                case string _:
+                case char _:
+                case bool _:
+                case byte _:
+                case short _:
+                case int _:
+                case long _:
+                case float _:
+                case double _:
+                    facade.Put(value);
+                    break;
+                case byte[] bytesValue:
+                    WriteBytes(bytesValue);
+                    break;
+                default:
+                    throw new MessageFormatException("Unsupported Object type: " + value.GetType().Name);
+            }
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            bytes = null;
+            remainingBytes = NO_BYTES_IN_FLIGHT;
+        }
+
+        public void Reset()
+        {
+            bytes = null;
+            remainingBytes = NO_BYTES_IN_FLIGHT;
+            IsReadOnlyBody = true;
+            facade.Reset();
+        }
+        
+        public override NmsMessage Copy()
+        {
+            NmsStreamMessage copy = new NmsStreamMessage(facade.Copy() as INmsStreamMessageFacade);
+            CopyInto(copy);
+            return copy;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Util/AtomicSequence.cs b/src/NMS.AMQP/Message/NmsTextMessage.cs
similarity index 58%
copy from src/NMS.AMQP/Util/AtomicSequence.cs
copy to src/NMS.AMQP/Message/NmsTextMessage.cs
index fd59a94..9334a36 100644
--- a/src/NMS.AMQP/Util/AtomicSequence.cs
+++ b/src/NMS.AMQP/Message/NmsTextMessage.cs
@@ -14,42 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS.Util;
 
-namespace Apache.NMS.AMQP.Util
+using Apache.NMS.AMQP.Message.Facade;
+
+namespace Apache.NMS.AMQP.Message
 {
-    /// <summary>
-    /// Simple utility class used mainly for Id generation.
-    /// </summary>
-    class AtomicSequence : Atomic<ulong>
+    public class NmsTextMessage : NmsMessage, ITextMessage
     {
-        public AtomicSequence() : base()
-        {
-        }
+        private readonly INmsTextMessageFacade facade;
 
-        public AtomicSequence(ulong defaultValue) : base(defaultValue)
+        public NmsTextMessage(INmsTextMessageFacade facade) : base(facade)
         {
+            this.facade = facade;
         }
 
-        public ulong getAndIncrement()
+        public string Text
         {
-            ulong val = 0;
-            lock (this)
+            get => facade.Text;
+            set
             {
-                val = atomicValue;
-                atomicValue++;
+                CheckReadOnlyBody();
+                facade.Text = value;
             }
-            return val;
         }
 
         public override string ToString()
         {
-            return Value.ToString();
+            return "NmsTextMessage { " + Text + " }";
+        }
+        
+        public override NmsMessage Copy()
+        {
+            NmsTextMessage copy = new NmsTextMessage(facade.Copy() as INmsTextMessageFacade);
+            CopyInto(copy);
+            return copy;
         }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
similarity index 71%
copy from src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
copy to src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index acb649a..f69b83f 100644
--- a/src/NMS.AMQP/Message/Cloak/IMapMessageCloak.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -14,19 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
 
-namespace Apache.NMS.AMQP.Message.Cloak
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
 {
-    interface IMapMessageCloak : IMessageCloak
+    public class OutboundMessageDispatch
     {
-        IPrimitiveMap Map { get; }
-        new IMapMessageCloak Copy();
+        public Id ProducerId { get; set; }
+        public ProducerInfo ProducerInfo { get; set; }
+        public NmsMessage Message { get; set; }
+        public bool SendAsync { get; set; }
     }
-}
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/StreamMessage.cs b/src/NMS.AMQP/Message/StreamMessage.cs
deleted file mode 100644
index 3891df6..0000000
--- a/src/NMS.AMQP/Message/StreamMessage.cs
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.IO;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util.Types;
-
-namespace Apache.NMS.AMQP.Message
-{
-    using Cloak;
-    class StreamMessage : Message, IStreamMessage
-    {
-        
-        private const int NO_BYTES_IN_BUFFER = -1;
-
-        private readonly new IStreamMessageCloak cloak;
-
-        private int RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
-
-        private byte[] Buffer = null;
-
-        internal StreamMessage(IStreamMessageCloak message) : base(message)
-        {
-            cloak = message;
-        }
-        
-        #region IStreamMessage Methods
-
-        public bool ReadBoolean()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            bool result;
-            object value = cloak.Peek();
-            if(value == null)
-            {
-                result = Convert.ToBoolean(value);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<bool>(value);
-            }
-            cloak.Pop();
-            return result;
-        }
-
-        public byte ReadByte()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            byte result;
-            object value = cloak.Peek();
-            if(value == null)
-            {
-                result = Convert.ToByte(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<byte>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public int ReadBytes(byte[] value)
-        {
-            FailIfWriteOnlyMsgBody();
-            if (value == null)
-            {
-                throw new NullReferenceException("Target byte array is null.");
-            }
-            if (RemainingBytesInBuffer == NO_BYTES_IN_BUFFER)
-            {
-                object data = cloak.Peek();
-                if (data == null)
-                {
-                    return -1;
-                }
-                else if (!(data is byte[]))
-                {
-                    throw new MessageFormatException("Next stream value is not a byte array.");
-                }
-                Buffer = data as byte[];
-                RemainingBytesInBuffer = Buffer.Length;
-            } 
-            int bufferOffset = Buffer.Length - RemainingBytesInBuffer;
-            int copyLength = Math.Min(value.Length, RemainingBytesInBuffer);
-            if(copyLength > 0)
-                Array.Copy(Buffer, bufferOffset, value, 0, copyLength);
-            RemainingBytesInBuffer -= copyLength;
-            if(RemainingBytesInBuffer == 0)
-            {
-                RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
-                Buffer = null;
-                cloak.Pop();
-            }
-            return copyLength;
-        }
-
-        public char ReadChar()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            char result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                throw new NullReferenceException("Cannot convert NULL value to char.");
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<char>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public double ReadDouble()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            double result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToDouble(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<double>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public short ReadInt16()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            short result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToInt16(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<short>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public int ReadInt32()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            int result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToInt32(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<int>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public long ReadInt64()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            long result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToInt64(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<long>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public object ReadObject()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            object result = null;
-            object value = null;
-            try
-            {
-                value = cloak.Peek();
-                if (value == null)
-                {
-                    result = null;
-                }
-                else if (value is byte[])
-                {
-                    byte[] buffer = value as byte[];
-                    result = new byte[buffer.Length];
-                    Array.Copy(buffer, 0, result as byte[], 0, buffer.Length);
-                }
-                else if (ConversionSupport.IsNMSType(value))
-                {
-                    result = value;
-                }
-            }
-            catch (EndOfStreamException eos)
-            {
-                throw NMSExceptionSupport.CreateMessageEOFException(eos);
-            }
-            catch (IOException ioe)
-            {
-                throw NMSExceptionSupport.CreateMessageFormatException(ioe);
-            }
-            catch (Exception e)
-            {
-                Tracer.InfoFormat("Unexpected exception caught reading Object stream. Exception = {0}", e);
-                throw NMSExceptionSupport.Create("Unexpected exception caught reading Object stream.", e);
-            }
-            cloak.Pop();
-            return result;
-        }
-
-        public float ReadSingle()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            float result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToSingle(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<float>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public string ReadString()
-        {
-            FailIfWriteOnlyMsgBody();
-            FailIfBytesInBuffer();
-            string result;
-            object value = cloak.Peek();
-            if (value == null)
-            {
-                result = Convert.ToString(null);
-            }
-            else
-            {
-                result = ConversionSupport.ConvertNMSType<string>(value);
-            }
-
-            cloak.Pop();
-            return result;
-        }
-
-        public void Reset()
-        {
-            RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
-            Buffer = null;
-            IsReadOnly = true;
-            cloak.Reset();
-        }
-
-        public override void ClearBody()
-        {
-            RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
-            Buffer = null;
-            IsReadOnly = false;
-
-            base.ClearBody();
-        }
-
-        public void WriteBoolean(bool value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteByte(byte value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteBytes(byte[] value)
-        {
-            WriteBytes(value, 0, value.Length);
-        }
-
-        public void WriteBytes(byte[] value, int offset, int length)
-        {
-            FailIfReadOnlyMsgBody();
-            byte[] entry = new byte[length];
-            Array.Copy(value, offset, entry, 0, length);
-            cloak.Put(entry);
-        }
-
-        public void WriteChar(char value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteDouble(double value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteInt16(short value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteInt32(int value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteInt64(long value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteObject(object value)
-        {
-            FailIfReadOnlyMsgBody();
-            if(value == null)
-            {
-                cloak.Put(value);
-            }
-            else if(value is byte[])
-            {
-                WriteBytes(value as byte[]);
-            }
-            else if (ConversionSupport.IsNMSType(value))
-            {
-                cloak.Put(value);
-            }
-            else
-            {
-                throw NMSExceptionSupport.CreateMessageFormatException(new Exception("Unsupported Object type: " + value.GetType().Name));
-            }
-        }
-
-        public void WriteSingle(float value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        public void WriteString(string value)
-        {
-            FailIfReadOnlyMsgBody();
-            cloak.Put(value);
-        }
-
-        #endregion
-
-        #region Validation Methods
-        
-        protected void FailIfBytesInBuffer()
-        {
-            if(RemainingBytesInBuffer != NO_BYTES_IN_BUFFER)
-            {
-                throw new MessageFormatException("Unfinished Buffered read for ReadBytes(byte[] value)");
-            }
-        }
-
-        #endregion
-
-        internal override Message Copy()
-        {
-            return new StreamMessage(this.cloak.Copy());
-        }
-    }
-}
diff --git a/src/NMS.AMQP/Message/TextMessage.cs b/src/NMS.AMQP/Message/TextMessage.cs
deleted file mode 100644
index e7a6169..0000000
--- a/src/NMS.AMQP/Message/TextMessage.cs
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-
-namespace Apache.NMS.AMQP.Message
-{
-    using Cloak;
-    /// <summary>
-    /// Apache.NMS.AMQP.Message.TextMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.ITextMessage interface.
-    /// Apache.NMS.AMQP.Message.TextMessage uses the Apache.NMS.AMQP.Message.Cloak.ITextMessageCloak interface to detach from the underlying AMQP 1.0 engine.
-    /// </summary>
-    class TextMessage : Message, ITextMessage
-    {
-        protected readonly new ITextMessageCloak cloak;
-
-        #region Constructor
-
-        internal TextMessage(ITextMessageCloak cloak) : base(cloak)
-        {
-            this.cloak = cloak;
-        }
-
-        #endregion
-
-        #region ITextMessage Properties
-
-        public string Text
-        {
-            get
-            {
-                return cloak.Text;
-            }
-
-            set
-            {
-                FailIfReadOnlyMsgBody();
-                cloak.Text = value;
-            }
-        }
-
-        #endregion
-
-        internal override Message Copy()
-        {
-            return new TextMessage(this.cloak.Copy());
-        }
-    }
-}
diff --git a/src/NMS.AMQP/MessageConsumer.cs b/src/NMS.AMQP/MessageConsumer.cs
deleted file mode 100644
index 2ee9312..0000000
--- a/src/NMS.AMQP/MessageConsumer.cs
+++ /dev/null
@@ -1,977 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Amqp;
-using Amqp.Framing;
-using Apache.NMS;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Util.Types.Queue;
-using Apache.NMS.AMQP.Message.Cloak;
-
-namespace Apache.NMS.AMQP
-{
-    /// <summary>
-    /// MessageConsumer Implement the <see cref="Apache.NMS.IMessageConsumer"/> interface. 
-    /// This class configures and manages an amqp receiver link.
-    /// The Message consumer can be configured to receive message asynchronously or synchronously.
-    /// </summary>
-    class MessageConsumer : MessageLink, IMessageConsumer
-    {
-        // The Executor threshold limits the number of message listener dispatch events that can be on the session's executor at given time.
-        private const int ExecutorThreshold = 2;
-        private ConsumerInfo consumerInfo;
-        private readonly string selector;
-        private Apache.NMS.Util.Atomic<bool> hasStarted = new Apache.NMS.Util.Atomic<bool>(false);
-        private MessageCallback OnInboundAMQPMessage;
-        private IMessageQueue messageQueue;
-        private LinkedList<IMessageDelivery> delivered;
-        private System.Threading.ManualResetEvent MessageListenerInUseEvent = new System.Threading.ManualResetEvent(true);
-        // pending Message delivery tasks counts the number of pending tasks on the Session's executor.
-        // this should optimize the number of delivery task on the executor thread.
-        private volatile int pendingMessageDeliveryTasks = 0;
-        private volatile int MaxPendingTasks = 0;
-
-        // stat counters useful for debuging
-        // TODO create statistic container for counters maybe use ConsumerInfo?
-        private int transportMsgCount = 0;
-        private int messageDispatchCount = 0;
-
-        #region Constructor
-
-        internal MessageConsumer(Session ses, Destination dest) : this(ses, dest, null, null)
-        {
-        }
-
-        internal MessageConsumer(Session ses, IDestination dest) : this(ses, dest, null, null)
-        {
-        }
-
-        internal MessageConsumer(Session ses, IDestination dest, string name, string selector, bool noLocal = false) : base(ses, dest)
-        {
-            this.selector = selector;
-            consumerInfo = new ConsumerInfo(ses.ConsumerIdGenerator.GenerateId());
-            consumerInfo.SubscriptionName = name;
-            consumerInfo.Selector = this.selector;
-            consumerInfo.NoLocal = noLocal;
-            Info = consumerInfo;
-            messageQueue =  new PriorityMessageQueue();
-            delivered = new LinkedList<IMessageDelivery>();
-            Configure();
-        }
-        
-        #endregion
-
-        #region Private Properties
-
-        protected new IReceiverLink Link
-        {
-            get { return base.Link as IReceiverLink; }
-            
-        }
-        
-        // IsBrowser is a stub for an inherited Brower subclass
-        protected virtual bool IsBrowser { get { return false; } }
-
-        #endregion
-
-        #region Internal Properties
-
-        internal Id ConsumerId { get { return this.Info.Id; } }
-
-        internal virtual bool IsDurable {  get { return this.consumerInfo.SubscriptionName != null && this.consumerInfo.SubscriptionName.Length > 0; } }
-
-        internal virtual bool HasSelector { get { return this.consumerInfo.Selector != null && this.consumerInfo.Selector.Length > 0; } }
-        
-        #endregion
-
-        #region Private Methods
-
-        private void AckReceived(IMessageDelivery delivery)
-        {
-            IMessageCloak cloak = delivery.Message.GetMessageCloak();
-            if (cloak.AckHandler != null)
-            {
-                delivered.AddLast(delivery);
-            }
-            else
-            {
-                AckConsumed(delivery);
-            }
-        }
-        
-        private void AckConsumed(IMessageDelivery delivery)
-        {
-            Message.Message nmsMessage = delivery.Message;
-            Tracer.DebugFormat("Consumer {0} Acking Accepted for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
-            delivered.Remove(delivery);
-            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
-            this.Link.Accept(amqpMessage);
-        }
-
-        private void AckReleased(IMessageDelivery delivery)
-        {
-            Message.Message nmsMessage = delivery.Message;
-            Tracer.DebugFormat("Consumer {0} Acking Released for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
-            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
-            this.Link.Release(amqpMessage);
-        }
-
-        private void AckRejected(IMessageDelivery delivery, NMSException ex)
-        {
-            Error err = new Error(NMSErrorCode.INTERNAL_ERROR);
-            err.Description = ex.Message;
-            AckRejected(delivery, err);
-        }
-        
-        private void AckRejected(IMessageDelivery delivery, Error err = null)
-        {
-            Tracer.DebugFormat("Consumer {0} Acking Rejected for Message {1} with error {2} ", ConsumerId, delivery.Message.NMSMessageId, err?.ToString());
-            Amqp.Message amqpMessage = (delivery.Message.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
-            this.Link.Reject(amqpMessage, err);
-        }
-
-        private void AckModified(IMessageDelivery delivery, bool deliveryFailed, bool undeliverableHere = false)
-        {
-            Message.Message nmsMessage = delivery.Message;
-            Tracer.DebugFormat("Consumer {0} Acking Modified for Message {1}{2}{3}.", ConsumerId, nmsMessage.NMSMessageId, 
-                deliveryFailed ? " Delivery Failed" : "",
-                undeliverableHere ? " Undeliveryable Here" : "");
-            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
-            //TODO use Link.Modified from amqpNetLite 2.0.0
-            this.Link.Modify(amqpMessage, deliveryFailed, undeliverableHere, null);
-        }
-
-        private bool IsMessageRedeliveryExceeded(IMessageDelivery delivery)
-        {
-            Message.Message message = delivery.Message;
-            IRedeliveryPolicy policy = this.Session.Connection.RedeliveryPolicy;
-            if (policy != null && policy.MaximumRedeliveries >= 0)
-            {
-                IMessageCloak msgCloak = message.GetMessageCloak();
-                return msgCloak.RedeliveryCount > policy.MaximumRedeliveries;
-            }
-            return false;
-        }
-
-        private bool IsMessageExpired(IMessageDelivery delivery)
-        {
-            Message.Message message = delivery.Message;
-            if (message.NMSTimeToLive != TimeSpan.Zero)
-            {
-                DateTime now = DateTime.UtcNow;
-                if (!IsBrowser && (message.NMSTimestamp + message.NMSTimeToLive) < now)
-                {
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        #endregion
-
-        #region Protected Methods
-
-        protected void EnterMessageListenerEvent()
-        {
-            try
-            {
-                if(!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
-                    MessageListenerInUseEvent.Reset();
-            }
-            catch (Exception e)
-            {
-                Tracer.ErrorFormat("Failed to Reset MessageListener Event signal. Error : {0}", e);
-            }
-            
-        }
-
-        protected void LeaveMessageListenerEvent()
-        {
-            RemoveTaskRef();
-            try
-            {
-                if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
-                    MessageListenerInUseEvent.Set();
-            }
-            catch (Exception e)
-            {
-                Tracer.ErrorFormat("Failed to Send MessageListener Event signal. Error : {0}", e);
-            }
-        }
-
-        protected bool WaitOnMessageListenerEvent(int timeout = -1)
-        {
-            bool signaled = false;
-            if (OnMessage != null)
-            {
-                if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
-                {
-                    signaled = (timeout > -1) ? MessageListenerInUseEvent.WaitOne(timeout) : MessageListenerInUseEvent.WaitOne();
-                }
-                else if (!this.IsClosed)
-                {
-                    Tracer.WarnFormat("Failed to wait for Message Listener Event on consumer {0}", Id);
-                }
-            }
-            
-            return signaled;
-        }
-
-        protected void AddTaskRef()
-        {
-            System.Threading.Interlocked.Increment(ref pendingMessageDeliveryTasks);
-            int lastPending = MaxPendingTasks;
-            MaxPendingTasks = Math.Max(pendingMessageDeliveryTasks, MaxPendingTasks);
-            if (lastPending < MaxPendingTasks)
-            {
-                //Tracer.WarnFormat("Consumer {0} Distpatch event highwatermark increased to {1}.", Id, MaxPendingTasks);
-            }
-        }
-
-        protected void RemoveTaskRef()
-        {
-            System.Threading.Interlocked.Decrement(ref pendingMessageDeliveryTasks);
-        }
-        
-        protected void OnInboundMessage(IReceiverLink link, Amqp.Message message)
-        {
-            Message.Message msg = null;
-            try
-            {
-                IMessage nmsMessage = Message.AMQP.AMQPMessageBuilder.CreateProviderMessage(this, message);
-                msg = nmsMessage as Message.Message;
-                if(
-                    Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
-                    Session.AcknowledgementMode.Equals(AcknowledgementMode.ClientAcknowledge)
-                    )
-                {
-                    msg.GetMessageCloak().AckHandler = new Message.MessageAcknowledgementHandler(this, msg);
-                }
-            }
-            catch (Exception e)
-            {
-                this.Session.OnException(e);
-            }
-
-            if(msg != null)
-            {
-                transportMsgCount++;
-                
-                SendForDelivery(new MessageDelivery(msg));
-            }
-        }
-
-        protected void SendForDelivery(IMessageDelivery nmsDelivery)
-        {
-            this.messageQueue.Enqueue(nmsDelivery);
-
-            if (this.OnMessage != null && pendingMessageDeliveryTasks < ExecutorThreshold)
-            {
-                AddTaskRef();
-                DispatchEvent dispatchEvent = new MessageListenerDispatchEvent(this);
-                Session.Dispatcher.Enqueue(dispatchEvent);
-            }
-            else if (pendingMessageDeliveryTasks < 0)
-            {
-                Tracer.ErrorFormat("Consumer {0} has invalid pending tasks count on executor {1}.", Id, pendingMessageDeliveryTasks);
-            }
-        }
-
-        protected virtual void OnAttachResponse(ILink link, Attach attachResponse)
-        {
-            Tracer.InfoFormat("Received Performative Attach response on Link: {0}, Response: {1}", ConsumerId, attachResponse.ToString());
-            OnResponse();
-            if (link.Error != null)
-            {
-                this.Session.OnException(ExceptionSupport.GetException(link, "Consumer {0} Attach Failure.", this.ConsumerId));
-            }
-        }
-
-        protected void SendFlow(int credit)
-        {
-            if (!mode.Value.Equals(Resource.Mode.Stopped))
-            {
-                this.Link.SetCredit(credit, false);
-            }
-        }
-
-        protected virtual bool TryDequeue(out IMessageDelivery delivery, int timeout)
-        {
-            delivery = null;
-            DateTime deadline = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeout);
-            Tracer.DebugFormat("Waiting for msg availability Deadline {0}", deadline);
-            try
-            {
-                while (true)
-                {
-                    if(timeout == 0)
-                    {
-                        delivery = this.messageQueue.DequeueNoWait();
-                    }
-                    else
-                    {
-                        delivery = this.messageQueue.Dequeue(timeout);
-                    }
-                    
-                    if (delivery == null)
-                    {
-                        if (timeout == 0 || this.Link.IsClosed || this.messageQueue.IsClosed)
-                        {
-                            return false;
-                        }
-                        else if (timeout > 0)
-                        {
-                            timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
-                        }
-                    }
-                    else if (IsMessageExpired(delivery))
-                    {
-                        DateTime now = DateTime.UtcNow;
-                        Error err = new Error(NMSErrorCode.PROPERTY_ERROR);
-                        err.Description = "Message Expired";
-                        AckRejected(delivery,  err);
-                        if (timeout > 0)
-                        {
-                            timeout = Math.Max((deadline - now).Milliseconds, 0);
-                        }
-                        if(Tracer.IsDebugEnabled)
-                            Tracer.DebugFormat("{0} Filtered expired (deadline {1} Now {2}) message: {3}", ConsumerId, deadline, now, delivery);
-                    }
-                    else if (IsMessageRedeliveryExceeded(delivery))
-                    {
-                        if (Tracer.IsDebugEnabled)
-                            Tracer.DebugFormat("{0} Filtered Message with excessive Redelivery Count: {1}", ConsumerId, delivery);
-                        AckModified(delivery, true, true);
-                        if (timeout > 0)
-                        {
-                            timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
-                        }
-                    }
-                    else
-                    {
-                        break;
-                    }
-
-                }
-            }
-            catch(Exception e)
-            {
-                throw ExceptionSupport.Wrap(e, "Failed to Received message on consumer {0}.", ConsumerId);
-            }
-            
-            return true;
-        }
-
-        protected void ThrowIfAsync()
-        {
-            if (this.OnMessage != null)
-            {
-                throw new IllegalStateException("Cannot synchronously receive message on a synchronous consumer " + consumerInfo);
-            }
-        }
-
-        protected void DrainMessageQueueIfAny()
-        {
-            if (OnMessage != null && messageQueue.Count > 0)
-            {
-                DispatchEvent deliveryTask = new MessageListenerDispatchEvent(this);
-                Session.Dispatcher.Enqueue(deliveryTask);
-            }
-        }
-
-        protected void PrepareMessageForDelivery(Message.Message message)
-        {
-            if (message == null) return;
-            if(message is Message.BytesMessage)
-            {
-                (message as Message.BytesMessage).Reset();
-            }
-            else if(message is Message.StreamMessage)
-            {
-                (message as Message.StreamMessage).Reset();
-            }
-            else
-            {
-                message.IsReadOnly = true;
-            }
-            message.IsReadOnlyProperties = true;
-        }
-
-        #endregion
-
-        #region Internal Methods
-
-        internal bool HasSubscription(string name)
-        {
-            return !IsClosed && IsDurable && String.Compare(name, this.consumerInfo.SubscriptionName, false) == 0;
-        }
-
-        internal bool IsUsingDestination(IDestination destination)
-        {
-            return this.Destination.Equals(destination);
-        }
-
-        internal void Recover()
-        {
-            Tracer.DebugFormat("Session recover for consumer: {0}", Id);
-            IMessageCloak cloak = null;
-            IMessageDelivery delivery = null;
-            lock (messageQueue.SyncRoot)
-            {
-                while ((delivery = delivered.Last?.Value) != null)
-                {
-                    cloak = delivery.Message.GetMessageCloak();
-                    cloak.DeliveryCount = cloak.DeliveryCount + 1;
-                    (delivery as MessageDelivery).EnqueueFirst = true;
-                    delivered.RemoveLast();
-                    SendForDelivery(delivery);
-                }
-                delivered.Clear();
-            }
-        }
-
-        internal void AcknowledgeMessage(Message.Message message, Message.AckType ackType)
-        {
-            
-            ThrowIfClosed();
-            IMessageDelivery nmsDelivery = null;
-            foreach (IMessageDelivery delivery in delivered)
-            {
-                if (delivery.Message.Equals(message))
-                {
-                    nmsDelivery = delivery;
-                }
-            }
-            if(nmsDelivery == null)
-            {
-                nmsDelivery = new MessageDelivery(message);
-            }
-            switch (ackType)
-            {
-                case Message.AckType.ACCEPTED:
-                    AckConsumed(nmsDelivery);
-                    break;
-                case Message.AckType.MODIFIED_FAILED:
-                    AckModified(nmsDelivery, true);
-                    break;
-                case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
-                    AckModified(nmsDelivery, true, true);
-                    break;
-                case Message.AckType.REJECTED:
-                    AckRejected(nmsDelivery);
-                    break;
-                case Message.AckType.RELEASED:
-                    AckReleased(nmsDelivery);
-                    break;
-                default:
-                    throw new NMSException("Unkown message acknowledgement type " + ackType);
-            }
-        }
-
-        internal void Acknowledge(Message.AckType ackType)
-        {
-            
-            foreach(IMessageDelivery delivery in delivered.ToArray())
-            {
-                switch (ackType)
-                {
-                    case Message.AckType.ACCEPTED:
-                        AckConsumed(delivery);
-                        break;
-                    case Message.AckType.MODIFIED_FAILED:
-                        AckModified(delivery, true);
-                        break;
-                    case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
-                        AckModified(delivery, true, true);
-                        break;
-                    case Message.AckType.REJECTED:
-                        AckRejected(delivery);
-                        break;
-                    case Message.AckType.RELEASED:
-                        AckReleased(delivery);
-                        break;
-                    default:
-                        Tracer.WarnFormat("Unkown message acknowledgement type {0} for message {}", ackType, delivery.Message.NMSMessageId);
-                        break;
-                }
-            }
-            delivered.Clear();
-        }
-
-        #endregion
-
-        #region IMessageConsumer Properties
-
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-
-            set
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        #endregion
-
-        #region IMessageConsumer Events
-        protected event MessageListener OnMessage;
-
-        public event MessageListener Listener
-        {
-            add
-            {
-
-                if (this.IsStarted)
-                {
-                    throw new IllegalStateException("Cannot add MessageListener to consumer " + Id + " on a started Connection.");
-                }
-                if(value != null)
-                {
-                    OnMessage += value;
-                }
-            }
-            remove
-            {
-                if (this.IsStarted)
-                {
-                    throw new IllegalStateException("Cannot remove MessageListener to consumer " + Id + " on a started Connection.");
-                }
-                if (value != null)
-                {
-                    OnMessage -= value;
-                }
-            }
-        }
-
-        #endregion
-
-        #region IMessageConsumer Methods
-        
-        public IMessage Receive()
-        {
-            ThrowIfClosed();
-            ThrowIfAsync();
-            if (TryDequeue(out IMessageDelivery delivery, -1))
-            {
-                Message.Message copy = delivery.Message.Copy();
-                PrepareMessageForDelivery(copy);
-                AckReceived(delivery);
-                return copy;
-            }
-            return null;
-        }
-
-        public IMessage Receive(TimeSpan timeout)
-        {
-            ThrowIfClosed();
-            ThrowIfAsync();
-            int timeoutMilis = Convert.ToInt32(timeout.TotalMilliseconds);
-            if(timeoutMilis == 0)
-            {
-                timeoutMilis = -1;
-            }
-            if (TryDequeue(out IMessageDelivery delivery, timeoutMilis))
-            {
-                Message.Message copy = delivery.Message.Copy();
-                PrepareMessageForDelivery(copy);
-                AckReceived(delivery);
-                return copy;
-            }
-            return null;
-        }
-
-        public IMessage ReceiveNoWait()
-        {
-            ThrowIfClosed();
-            ThrowIfAsync();
-            if (TryDequeue(out IMessageDelivery delivery, 0))
-            {
-                Message.Message copy = delivery.Message.Copy();
-                PrepareMessageForDelivery(copy);
-                AckReceived(delivery);
-                return copy;
-            }
-            return null;
-        }
-
-        #endregion
-
-        #region MessageLink Methods
-
-        private Target CreateTarget()
-        {
-            Target target = new Target();
-            return target;
-        }
-
-        private Source CreateSource()
-        {
-            Source source = new Source();
-            source.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
-            source.Outcomes = new Amqp.Types.Symbol[] 
-            {
-                SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
-                SymbolUtil.ATTACH_OUTCOME_RELEASED,
-                SymbolUtil.ATTACH_OUTCOME_REJECTED,
-                SymbolUtil.ATTACH_OUTCOME_MODIFIED
-            };
-            source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;
-
-            if (this.IsDurable)
-            {
-                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
-                source.Durable = (int)TerminusDurability.UNSETTLED_STATE;
-                source.DistributionMode=SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
-            }
-            else
-            {
-                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
-                source.Durable = (int)TerminusDurability.NONE;
-            }
-            
-            if (this.IsBrowser)
-            {
-                source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
-            }
-
-            source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
-
-            Amqp.Types.Map filters = new Amqp.Types.Map();
-
-            // TODO add filters for noLocal and Selector using appropriate Amqp Described types
-
-            // No Local
-            // qpid jms defines a no local filter as an amqp described type 
-            //      AmqpJmsNoLocalType where
-            //          Descriptor = 0x0000468C00000003UL
-            //          Described = "NoLocalFilter{}" (type string)
-            if (consumerInfo.NoLocal)
-            {
-                filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
-            }
-
-            // Selector
-            // qpid jms defines a selector filter as an amqp described type 
-            //      AmqpJmsSelectorType where
-            //          Descriptor = 0x0000468C00000004UL
-            //          Described = "<selector_string>" (type string)
-            if (this.HasSelector)
-            {
-                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, this.consumerInfo.Selector);
-            }
-
-            // Assign filters
-            if (filters.Count > 0)
-            {
-                source.FilterSet = filters;
-            }
-
-            return source;
-        }
-        
-        protected override ILink CreateLink()
-        {
-            Attach attach = new Amqp.Framing.Attach()
-            {
-                Target = CreateTarget(),
-                Source = CreateSource(),
-                RcvSettleMode = ReceiverSettleMode.First,
-                SndSettleMode = (IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
-            };
-            string name = null;
-            if (IsDurable)
-            {
-                name = consumerInfo.SubscriptionName;
-            }
-            else
-            {
-                string destinationAddress = (attach.Source as Source).Address ?? "";
-                name = "nms:receiver:" + this.ConsumerId.ToString() 
-                    + ((destinationAddress.Length == 0) ? "" : (":" + destinationAddress));
-            }
-            IReceiverLink link = new ReceiverLink(Session.InnerSession as Amqp.Session, name, attach, OnAttachResponse);
-            return link;
-        }
-
-        protected override void OnInternalClosed(IAmqpObject sender, Error error)
-        {
-            if (Tracer.IsDebugEnabled)
-            {
-                Tracer.DebugFormat("Received Close notification for MessageConsumer {0} {1} {2}",
-                    this.Id,
-                    IsDurable ? "with subscription name " + this.consumerInfo.SubscriptionName : "",
-                    error == null ? "" : "with cause " + error);
-            }
-            base.OnInternalClosed(sender, error);
-            this.OnResponse();
-        }
-        
-        protected override void StopResource()
-        {
-            if (Session.Dispatcher.IsOnDispatchThread)
-            {
-                throw new IllegalStateException("Cannot stop Connection {0} in MessageListener.", Session.Connection.ClientId);
-            }
-            // Cut message window
-            // TODO figure out draining message window without raising a closed window exception (link-credit-limit-exceeded Error) from amqpnetlite.
-            //SendFlow(1);
-            // Stop message delivery
-            this.messageQueue.Stop();
-            // Now wait until the MessageListener callback is finish executing.
-            this.WaitOnMessageListenerEvent();
-        }
-
-        protected override void StartResource()
-        {
-            // Do Attach request if not done already
-            base.StartResource();
-            // Start Message Delivery
-            messageQueue.Start();
-            DrainMessageQueueIfAny();
-
-            // Setup AMQP message transport thread callback
-            OnInboundAMQPMessage = OnInboundMessage;
-            // Open Message Window to receive messages.
-            this.Link.Start(consumerInfo.LinkCredit, OnInboundAMQPMessage);
-            
-        }
-
-        
-        
-        /// <summary>
-        /// Executes the AMQP network detach operation.
-        /// </summary>
-        /// <param name="timeout">
-        /// Timeout to wait for for detach response. A timeout of 0 or less will not block to wait for a response.
-        /// </param>
-        /// <param name="cause">Error to detach link. Can be null.</param>
-        /// <exception cref="Amqp.AmqpException">
-        /// Throws when an error occur during amqp detach. Or contains Error response from detach.
-        /// </exception>
-        /// <exception cref="System.TimeoutException">
-        /// Throws when detach response is not received in specified timeout.
-        /// </exception>
-        protected override void DoClose(TimeSpan timeout, Error cause = null)
-        {
-            if(IsDurable)
-            {
-                Task t = this.Link.DetachAsync(cause);
-                if(TimeSpan.Compare(timeout, TimeSpan.Zero) > 0)
-                {
-                    /*
-                     * AmqpNetLite does not allow a timeout to be specific for link detach request even though
-                     * it uses the same close operation which takes a parameter for timeout. AmqpNetLite uses 
-                     * it default timeout of 60000ms, see AmqpObject.DefaultTimeout, for the detach close 
-                     * operation forcing the detach request to be synchronous. To allow for asynchronous detach
-                     * request an NMS MessageConsumer must call the DetachAsync method on a link which will block
-                     * for up to 60000ms asynchronously to set a timeout exception or complete the task. 
-                     */
-                    const int amqpNetLiteDefaultTimeoutMillis = 60000; // taken from AmqpObject.DefaultTimeout
-                    TimeSpan amqpNetLiteDefaultTimeout = TimeSpan.FromMilliseconds(amqpNetLiteDefaultTimeoutMillis);
-                    // Create timeout which allows for the 60000ms block in the DetachAsync task.
-                    TimeSpan actualTimeout = amqpNetLiteDefaultTimeout + timeout;
-                    
-                    TaskUtil.Wait(t, actualTimeout);
-                    if(t.Exception != null)
-                    {
-                        if(t.Exception is AggregateException)
-                        {
-                            throw t.Exception.InnerException;
-                        }
-                        else
-                        {
-                            throw t.Exception;
-                        }
-                    }
-                }
-            }
-            else
-            {
-                base.DoClose(timeout, cause);
-            }
-        }
-        /// <summary>
-        /// Overload for the Template method <see cref="MessageLink.Shutdown"/> specific to <see cref="MessageConsumer"/>.
-        /// </summary>
-        /// <param name="closeMessageQueue">Indicates whether or not to close the messageQueue for the MessageConsumer.</param>
-        internal override void Shutdown()
-        {
-            this.messageQueue.Close();
-        }
-
-        #endregion
-
-        #region IDisposable Methods
-        public void Dispose()
-        {
-            try
-            {
-                this.Close();     
-            }
-            catch (Exception ex)
-            {
-                Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", this.GetType().Name, this.Id, ex);
-            }   
-        }
-        protected override void Dispose(bool disposing)
-        {
-            if (!IsClosing && !IsClosed)
-            {
-                Tracer.InfoFormat("Consumer {0} stats: Transport Msgs {1}, Dispatch Msgs {2}, messageQueue {3}.",
-                    Id, transportMsgCount, messageDispatchCount, messageQueue.Count);
-            }
-            base.Dispose(disposing);
-            MessageListenerInUseEvent.Dispose();
-        }
-
-
-        #endregion
-
-        #region Inner MessageListenerDispatchEvent Class
-
-        protected class MessageListenerDispatchEvent : WaitableDispatchEvent 
-        {
-            private MessageConsumer consumer;
-            
-            internal MessageListenerDispatchEvent(MessageConsumer consumer) : base()
-            {
-                this.consumer = consumer;
-                Callback = this.DispatchMessageListeners;
-            }
-
-            public override void OnFailure(Exception e)
-            {
-                base.OnFailure(e);
-                consumer.Session.OnException(e);
-            }
-
-            public void DispatchMessageListeners()
-            {
-                IMessageDelivery delivery = null;
-                Message.Message nmsProviderMessage = null;
-                if (consumer.IsClosed) return;
-                consumer.EnterMessageListenerEvent();
-                // the consumer pending Message delivery task 
-                
-                while ((delivery = consumer.messageQueue.DequeueNoWait()) != null)
-                {
-                    nmsProviderMessage = delivery.Message;
-                    consumer.AddTaskRef();
-                    consumer.messageDispatchCount++;
-                    try
-                    {
-                        
-                        if (consumer.IsMessageExpired(delivery))
-                        {
-                            consumer.AckModified(delivery, true);
-                        }
-                        else if (consumer.IsMessageRedeliveryExceeded(delivery))
-                        {
-                            consumer.AckModified(delivery, true, true);
-                        }
-                        else
-                        {
-                            
-                            bool deliveryFailed = false;
-                            bool isAutoOrDupsOk = consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
-                                                  consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.DupsOkAcknowledge);
-                            if (isAutoOrDupsOk)
-                            {
-                                consumer.delivered.AddLast(delivery);
-                            }
-                            else
-                            {
-                                consumer.AckReceived(delivery);
-                            }
-                             
-                            Message.Message copy = nmsProviderMessage.Copy();
-                            try
-                            {
-                                consumer.Session.ClearRecovered();
-                                consumer.PrepareMessageForDelivery(copy);
-                                if (Tracer.IsDebugEnabled)
-                                    Tracer.DebugFormat("Invoking Client Message Listener Callback for message {0}.", copy.NMSMessageId);
-                                consumer.OnMessage(copy);
-                            }
-                            catch (SystemException se)
-                            {
-                                Tracer.WarnFormat("Caught Exception on MessageListener for Consumer {0}. Message {1}.", consumer.Id, se.Message);
-                                deliveryFailed = true;
-                            }
-
-                            if (isAutoOrDupsOk && !consumer.Session.IsRecovered)
-                            {
-                                if (!deliveryFailed)
-                                {
-                                    consumer.AckConsumed(delivery);
-                                }
-                                else
-                                {
-                                    consumer.AckReleased(delivery);
-                                }
-                            }
-                        }
-
-                    }
-                    catch (Exception e)
-                    {
-                        // unhandled failure
-                        consumer.Session.OnException(e);
-                    }
-                    consumer.RemoveTaskRef();
-                }
-                consumer.LeaveMessageListenerEvent();
-            }
-        }
-
-        #endregion
-    }
-
-    #region Info class
-    internal class ConsumerInfo : LinkInfo
-    {
-        
-        protected const int DEFAULT_CREDIT = 200;
-
-        private int? credit = null;
-
-        internal ConsumerInfo(Id id) : base(id) { }
-
-        public int LinkCredit
-        {
-            get { return credit ?? DEFAULT_CREDIT; }
-            internal set { credit = value; }
-        }
-
-        public string Selector { get; internal set; } = null;
-        public string SubscriptionName { get; internal set; } = null;
-
-        public bool NoLocal { get; internal set; } = false;
-
-    }
-
-    #endregion
-
-}
diff --git a/src/NMS.AMQP/MessageLink.cs b/src/NMS.AMQP/MessageLink.cs
deleted file mode 100644
index 8958f9b..0000000
--- a/src/NMS.AMQP/MessageLink.cs
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Amqp;
-using Amqp.Framing;
-using System.Reflection;
-using Apache.NMS.AMQP.Util;
-using System.Collections.Specialized;
-
-namespace Apache.NMS.AMQP
-{
-
-    internal enum LinkState
-    {
-        UNKNOWN = -1,
-        INITIAL = 0,
-        ATTACHSENT = 1,
-        ATTACHED = 2,
-        DETACHSENT = 3,
-        DETACHED = 4
-    }
-
-    internal enum TerminusDurability
-    {
-        NONE = 0,
-        CONFIGURATION = 1,
-        UNSETTLED_STATE = 2,
-    }
-    
-    /// <summary>
-    /// Abstract Template for AmqpNetLite Amqp.ILink.
-    /// This class Templates the performative Attach and Detached process for the amqp procotol engine class.
-    /// The template operations are Attach and Detach.
-    /// </summary>
-    abstract class MessageLink : NMSResource<LinkInfo>
-    {
-        private CountDownLatch responseLatch=null;
-        private ILink impl;
-        private Atomic<LinkState> state = new Atomic<LinkState>(LinkState.INITIAL);
-        private readonly Session session;
-        private readonly IDestination destination;
-        private System.Threading.ManualResetEvent PerformativeOpenEvent = new System.Threading.ManualResetEvent(false);
-
-        protected MessageLink(Session ses, Destination dest)
-        {
-            session = ses;
-            destination = dest;
-        }
-
-        protected MessageLink(Session ses, IDestination dest)
-        {
-            session = ses;
-            if(dest is Destination || dest == null)
-            {
-                destination = dest as Destination;
-            }
-            else
-            {
-                if (!dest.IsTemporary)
-                {
-                    if(dest.IsQueue)
-                    {
-                        destination = Session.GetQueue((dest as IQueue).QueueName) as Destination;
-                    }
-                    else
-                    {
-                        destination = Session.GetQueue((dest as ITopic).TopicName) as Destination;
-                    }
-                    
-                }
-                else
-                {
-                    throw new NotImplementedException("Foreign temporary Destination Implementation Not Supported.");
-                }
-            }
-            
-        }
-
-        internal virtual Session Session { get => session; }
-        
-        internal IDestination Destination { get { return destination; } }
-
-        protected LinkState State { get => this.state.Value; }
-
-        protected ILink Link
-        {
-            get { return impl; }
-            private set {  }
-        }
-        
-        internal bool IsClosing { get { return state.Value.Equals(LinkState.DETACHSENT); } }
-
-        internal bool IsClosed { get { return state.Value.Equals(LinkState.DETACHED); } }
-
-        protected bool IsConfigurable { get { return state.Value.Equals(LinkState.INITIAL); } }
-
-        protected bool IsOpening { get { return state.Value.Equals(LinkState.ATTACHSENT); } }
-
-        protected bool IsRequestPending { get => this.responseLatch != null; }
-
-        internal NMSException FailureCause { get; set; } = null;
-
-        internal void SetFailureCause(Error error, string reason = "")
-        {
-            this.FailureCause = ExceptionSupport.GetException(error, reason);
-        }
-
-        internal void Attach()
-        {
-            if (state.CompareAndSet(LinkState.INITIAL, LinkState.ATTACHSENT))
-            {
-                PerformativeOpenEvent.Reset();
-                responseLatch = new CountDownLatch(1);
-                impl = CreateLink();
-                this.Link.AddClosedCallback(this.OnInternalClosed);
-                LinkState finishedState = LinkState.UNKNOWN;
-                try
-                {
-                    bool received = true;
-                    if (this.Info.requestTimeout <= 0)
-                    {
-                        responseLatch.await();
-                    }
-                    else
-                    {
-                        received = responseLatch.await(RequestTimeout);
-                    }
-                    if(received && this.impl.Error == null)
-                    {
-                        finishedState = LinkState.ATTACHED;
-                    }
-                    else
-                    {
-                        finishedState = LinkState.INITIAL;
-                        if (!received)
-                        {
-                            Tracer.InfoFormat("Link {0} Attach timeout", Info.Id);
-                            this.OnTimeout();
-                        }
-                        else
-                        {
-                            Tracer.InfoFormat("Link {0} Attach error: {1}", Info.Id, this.impl.Error);
-                            this.OnFailure();
-                        }
-                    }
-
-
-                }
-                finally
-                {
-                    responseLatch = null;
-                    state.GetAndSet(finishedState);
-                    if(!state.Value.Equals(LinkState.ATTACHED) && !this.impl.IsClosed)
-                    {
-                        DoClose();
-                    }
-                    PerformativeOpenEvent.Set();
-                }
-                
-            }
-        }
-
-        protected virtual void Detach()
-        {
-            if (state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHSENT))
-            {
-                try
-                {
-                    DoClose();
-                }
-                catch (NMSException)
-                {
-                    throw;
-                }
-                catch (Exception ex)
-                {
-                    throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", this.Id);
-                }
-            }
-            else if (state.CompareAndSet(LinkState.INITIAL, LinkState.DETACHED))
-            {
-                // Link has not been established yet set state to dettached.
-            }
-            else if (state.Value.Equals(LinkState.ATTACHSENT))
-            {
-                // The Message Link is trying to estalish a link. It should wait until the Attach response is processed.
-                bool signaled = this.PerformativeOpenEvent.WaitOne(this.RequestTimeout);
-                if (signaled)
-                {
-                    if (state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHSENT))
-                    {
-                        // The Attach request completed succesfully establishing a link.
-                        // Now Close link.
-                        try
-                        {
-                            DoClose();
-                        }
-                        catch (NMSException)
-                        {
-                            throw;
-                        }
-                        catch (Exception ex)
-                        {
-                            throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", this.Id);
-                        }
-                    }
-                    else if (state.CompareAndSet(LinkState.INITIAL, LinkState.DETACHED))
-                    {
-                        // Failed to establish a link set state to Detached.
-                    }
-                }
-                else
-                {
-                    // Failed to receive establishment event signal.
-                    state.GetAndSet(LinkState.DETACHED);
-                }
-                    
-
-            }
-        }
-
-        /// <summary>
-        /// Defines the asynchronous Amqp.ILink error and close notification handler for the template.
-        /// This Method matches the delegate <see cref="Amqp.ClosedCallback"/>.
-        /// Concrete implementations are required to implement this method.
-        /// </summary>
-        /// <param name="sender">
-        /// The <see cref="Amqp.IAmqpObject"/> that has closed. Also, <seealso cref="Amqp.ClosedCallback"/>.
-        /// This will always be an ILink for the template.
-        /// </param>
-        /// <param name="error">
-        /// The <see cref="Amqp.Framing.Error"/> that caused the link to close.
-        /// This can be null should the link be closed intentially.
-        /// </param>
-        protected virtual void OnInternalClosed(Amqp.IAmqpObject sender, Error error)
-        {
-            bool failureThrow = true;
-            Session parent = null;
-            string name = (sender as ILink)?.Name;
-            NMSException failure = ExceptionSupport.GetException(error, 
-                "Received Amqp link detach with Error for link {0}", name); 
-
-            try
-            {
-                parent = this.Session;
-                this.FailureCause = failure;
-                if (this.state.CompareAndSet(LinkState.DETACHSENT, LinkState.DETACHED))
-                {
-                    // expected close
-                    parent.Remove(this);
-                }
-                else if (this.state.CompareAndSet(LinkState.ATTACHED, LinkState.DETACHED))
-                {
-                    // unexpected close or parent close
-                    if (this.IsStarted)
-                    {
-                        // indicate unexpected close
-                        this.Shutdown();
-                    }
-                    else
-                    {
-                        failureThrow = false;
-                    }
-
-                    parent.Remove(this);
-                }
-            }
-            catch (Exception ex)
-            {
-                // failure during NMS object instance cleanup.
-                Tracer.DebugFormat("Caught exception during Amqp Link {0} close. Exception {1}", name, ex);
-            }
-            
-            // Determine if there is a provider controlled call for this closed callback.
-            if (!failureThrow && failure != null)
-            {
-                // no provider controlled call.
-                // Log error if any.
-                
-                // Log exception.
-                Tracer.InfoFormat("Amqp Performative detach response error. Message {0}", failure.Message);
-                
-            }
-        }
-
-        /// <summary>
-        /// Defines the link create operation for the abstract template.
-        /// Concrete implmentations are required to implement this method.
-        /// </summary>
-        /// <returns>
-        /// An ILink that was configured by concrete implementation.
-        /// </returns>
-        protected abstract ILink CreateLink();
-
-        /// <summary>
-        /// See <see cref="MessageLink.DoClose(TimeSpan, Error)"/>.
-        /// </summary>
-        /// <param name="cause"></param>
-        protected void DoClose(Error cause = null)
-        {
-            this.DoClose(TimeSpan.FromMilliseconds(Info.closeTimeout), cause);
-        }
-
-        /// <summary>
-        /// Defines the link close operation for the abstract template.
-        /// Concrete implementation can implement this method to override the default Link close operation.
-        /// </summary>
-        /// <param name="timeout">
-        /// Timeout on network operation close for link. 
-        /// If greater then 0 then operation will be blocking.
-        /// Otherwise the network operation is non-blocking.
-        /// </param>
-        /// <param name="cause">
-        /// The amqp Error that caused the link to close.
-        /// </param>
-        /// <exception cref="Amqp.AmqpException">Throws for Detach response errors.</exception>
-        /// <exception cref="System.TimeoutException">Throws when timeout expires for blocking detach request.</exception>
-        protected virtual void DoClose(TimeSpan timeout, Error cause = null)
-        {
-            if (this.impl != null && !this.impl.IsClosed)
-            {
-                Tracer.DebugFormat("Detaching amqp link {0} for {1} with timeout {2}", this.Link.Name, this.Id, timeout);
-                this.impl.Close(timeout, cause);
-            }
-        }
-
-        protected virtual void OnResponse()
-        {
-            if (responseLatch != null)
-            {
-                responseLatch.countDown();
-            }
-        }
-
-        protected virtual void OnTimeout()
-        {
-            throw ExceptionSupport.GetTimeoutException(this.impl, "Performative Attach Timeout while waiting for response.");
-        }
-
-        protected virtual void OnFailure()
-        {
-            throw ExceptionSupport.GetException(this.impl, "Performative Attach Error.");
-        }
-
-        protected virtual void Configure()
-        {
-            StringDictionary connProps = Session.Connection.Properties;
-            StringDictionary sessProps = Session.Properties;
-            PropertyUtil.SetProperties(Info, connProps);
-            PropertyUtil.SetProperties(Info, sessProps);
-
-        }
-
-
-        #region NMSResource Methhods
-
-        protected override void StartResource()
-        {
-            this.Attach();
-        }
-        
-        protected override void ThrowIfClosed()
-        {
-            if (state.Value.Equals(LinkState.DETACHED) || this.FailureCause != null)
-            {
-                throw new Apache.NMS.IllegalStateException("Illegal operation on closed I" + this.GetType().Name + ".", this.FailureCause);
-            }
-        }
-
-        #endregion
-
-        #region Public Inheritable Properties
-
-        public TimeSpan RequestTimeout
-        {
-            get { return TimeSpan.FromMilliseconds(Info.requestTimeout); }
-            set { Info.requestTimeout = Convert.ToInt64(value.TotalMilliseconds); }
-        }
-
-        #endregion
-
-        #region Public Inheritable Methods
-
-        // All derived classes from MessageLink must be IDisposable.
-        // Use the IDispoable pattern for Close() and have Dispose() just call Close() and Close() do
-        // the things usually done in Dispose().
-        public void Close()
-        {
-            this.Dispose(true);
-            if (IsClosed)
-            {
-                GC.SuppressFinalize(this);
-            }
-        }
-        /// <summary>
-        /// Implements a template for the IDisposable Parttern. 
-        /// </summary>
-        /// <param name="disposing"></param>
-        protected virtual void Dispose(bool disposing)
-        {
-            if (this.IsClosed) return;
-            if (disposing)
-            {
-                // orderly shutdown
-                this.Stop();
-                this.Shutdown();
-                try
-                {
-                    this.Detach();
-                }
-                catch (Exception ex)
-                {
-                    // Log network errors
-                    Tracer.DebugFormat("Performative detached raised exception for {0} {1}. Message {2}",
-                        this.GetType().Name, this.Id, ex.Message);
-                }
-                finally
-                {
-                    // in case detach failed or times out.
-                    if (!this.IsClosed)
-                    {
-                        this.Session.Remove(this);
-                        this.state.Value = LinkState.DETACHED;
-                    }
-                    this.impl = null;
-                }
-            }
-        }
-
-        #endregion
-
-        #region Shutdown Template methods
-
-        /// <summary>
-        /// Shutdown orderly shuts down the Message link facilities.
-        /// </summary>
-        internal virtual void Shutdown()
-        {
-        }
-
-        
-        #endregion
-    }
-
-    #region LinkInfo Class
-
-    internal abstract class LinkInfo : ResourceInfo
-    {
-        protected static readonly long DEFAULT_REQUEST_TIMEOUT;
-        static LinkInfo()
-        {
-            DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
-        }
-
-        protected LinkInfo(Id linkId) : base(linkId)
-        {
-
-        }
-
-        public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
-        public long closeTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
-        public long sendTimeout { get; set; }
-
-        public override string ToString()
-        {
-            string result = "";
-            result += "LinkInfo = [\n";
-            foreach (MemberInfo info in this.GetType().GetMembers())
-            {
-                if (info is PropertyInfo)
-                {
-                    PropertyInfo prop = info as PropertyInfo;
-                    if (prop.GetGetMethod(true).IsPublic)
-                    {
-                        result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
-                    }
-                }
-            }
-            result = result.Substring(0, result.Length - 2) + "\n]";
-            return result;
-        }
-
-    }
-
-    #endregion
-}
diff --git a/src/NMS.AMQP/MessageProducer.cs b/src/NMS.AMQP/MessageProducer.cs
deleted file mode 100644
index 08d1f7d..0000000
--- a/src/NMS.AMQP/MessageProducer.cs
+++ /dev/null
@@ -1,555 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Collections.Specialized;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using System.Reflection;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.AMQP.Message;
-using Apache.NMS.AMQP.Message.AMQP;
-using Apache.NMS.AMQP.Message.Cloak;
-using Amqp;
-using Amqp.Framing;
-using System.Threading;
-
-namespace Apache.NMS.AMQP
-{
-    /// <summary>
-    /// Apache.NMS.AMQP.MessageProducer facilitates management and creates the underlying Amqp.SenderLink protocol engine object.
-    /// Apache.NMS.AMQP.MessageProducer is also a Factory for Apache.NMS.AMQP.Message.Message types.
-    /// </summary>
-    class MessageProducer : MessageLink, IMessageProducer
-    {
-        private IdGenerator msgIdGenerator;
-        private ISenderLink link;
-        private ProducerInfo producerInfo;
-        
-        // Stat fields
-        private int MsgsSentOnLink = 0;
-
-        #region Constructor
-
-        internal MessageProducer(Session ses, IDestination dest) : base(ses, dest)
-        {
-            producerInfo = new ProducerInfo(ses.ProducerIdGenerator.GenerateId());
-            Info = producerInfo;
-            Configure();
-            
-        }
-        
-        #endregion
-        
-        #region Internal Properties
-
-        internal Session InternalSession { get { return Session; } }
-
-        internal Id ProducerId { get { return producerInfo.Id; } }
-
-        #endregion
-
-        #region Private Methods
-        
-        private void ConfigureMessage(IMessage msg)
-        {
-            msg.NMSPriority = Priority;
-            msg.NMSDeliveryMode = DeliveryMode;
-            msg.NMSDestination = Destination;
-        }
-
-        private void OnAttachedResp(ILink link, Attach resp)
-        {
-            Tracer.InfoFormat("Received Performation Attach response on Link: {0}, Response: {1}", ProducerId, resp.ToString());
-            
-            OnResponse();
-        }
-        
-        internal void OnException(Exception e)
-        {
-            Session.OnException(e);
-        }
-
-        private Target CreateTarget()
-        {
-            Target t = new Target();
-            
-            t.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
-
-            t.Timeout = (uint)producerInfo.sendTimeout;
-
-            // Durable is used for a durable subscription
-            t.Durable = (uint)TerminusDurability.NONE;
-
-            if (Destination != null)
-            {
-                t.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
-            }
-            t.Dynamic = false;
-            
-            return t;
-        }
-
-        private Source CreateSource()
-        {
-            Source s = new Source();
-            s.Address = this.ProducerId.ToString();
-            s.Timeout = (uint)producerInfo.sendTimeout;
-            s.Outcomes = new Amqp.Types.Symbol[]
-            {
-                SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
-                SymbolUtil.ATTACH_OUTCOME_REJECTED,
-            };
-            return s;
-        }
-
-        private Attach CreateAttachFrame()
-        {
-            Attach frame = new Attach();
-            frame.Source = CreateSource();
-            frame.Target = CreateTarget();
-            frame.SndSettleMode = SenderSettleMode.Unsettled;
-            frame.IncompleteUnsettled = false;
-            frame.InitialDeliveryCount = 0;
-            
-            return frame;
-        }
-
-        #endregion
-
-        #region MessageLink abstract Methods
-
-        protected override ILink CreateLink()
-        {
-            Attach frame = CreateAttachFrame();
-
-            string linkName = producerInfo.Id + ":" + UriUtil.GetAddress(Destination, Session.Connection);
-            link = new SenderLink(Session.InnerSession as Amqp.Session, linkName, frame, OnAttachedResp);
-            
-            return link;
-        }
-
-        protected override void OnInternalClosed(IAmqpObject sender, Error error)
-        {
-            base.OnInternalClosed(sender, error);
-            this.OnResponse();
-        }
-
-        #endregion
-
-        #region NMSResource Methods
-
-        protected override void StopResource()
-        {
-            
-        }
-
-        #endregion
-
-        #region IMessageProducer Properties
-
-        public MsgDeliveryMode DeliveryMode
-        {
-            get { return producerInfo.msgDelMode; }
-            set { producerInfo.msgDelMode = value; }
-        }
-
-        public bool DisableMessageID
-        {
-            get { return producerInfo.disableMsgId; }
-            set { producerInfo.disableMsgId = value; }
-        }
-
-        public bool DisableMessageTimestamp
-        {
-            get { return producerInfo.disableTimeStamp; }
-            set { producerInfo.disableTimeStamp = value; }
-        }
-
-        public MsgPriority Priority
-        {
-            get { return producerInfo.priority; }
-            set { producerInfo.priority = value; }
-        }
-
-        public ProducerTransformerDelegate ProducerTransformer
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-
-            set
-            {
-                throw new NotImplementedException();
-            }
-        }
-        
-        public TimeSpan TimeToLive
-        {
-            get { return TimeSpan.FromMilliseconds(producerInfo.ttl); }
-            set { producerInfo.ttl = Convert.ToInt64(value.TotalMilliseconds); }
-        }
-
-        #endregion
-
-        #region IMessageProducer Methods
-        
-        public IBytesMessage CreateBytesMessage()
-        {
-            this.ThrowIfClosed();
-            return Session.CreateBytesMessage();
-        }
-
-        public IBytesMessage CreateBytesMessage(byte[] body)
-        {
-            this.ThrowIfClosed();
-            IBytesMessage msg = CreateBytesMessage();
-            msg.WriteBytes(body);
-            return msg;
-        }
-
-        public IMapMessage CreateMapMessage()
-        {
-            this.ThrowIfClosed();
-            return Session.CreateMapMessage();
-        }
-
-        public IMessage CreateMessage()
-        {
-            this.ThrowIfClosed();
-            return Session.CreateMessage();
-        }
-
-        public IObjectMessage CreateObjectMessage(object body)
-        {
-            this.ThrowIfClosed();
-            return Session.CreateObjectMessage(body);
-        }
-
-        public IStreamMessage CreateStreamMessage()
-        {
-            this.ThrowIfClosed();
-            return Session.CreateStreamMessage();
-        }
-
-        public ITextMessage CreateTextMessage()
-        {
-            this.ThrowIfClosed();
-            return Session.CreateTextMessage();
-        }
-
-        public ITextMessage CreateTextMessage(string text)
-        {
-            ITextMessage msg = CreateTextMessage();
-            msg.Text = text;
-            return msg;
-        }
-
-        protected override void Dispose(bool disposing)
-        {
-            bool wasNotClosed = !IsClosed;
-            base.Dispose(disposing);
-            if (IsClosed && wasNotClosed)
-            {
-                Tracer.InfoFormat("Closing Producer {0}, MsgSentOnLink {1}", Id, MsgsSentOnLink);
-            }
-        }
-
-        public void Dispose()
-        {
-            this.Close();
-        }
-
-        public void Send(IMessage message)
-        {
-            Send(message, DeliveryMode, Priority, TimeToLive);
-        }
-
-        public void Send(IDestination destination, IMessage message)
-        {
-            Send(destination, message, DeliveryMode, Priority, TimeToLive);
-        }
-
-        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-        {
-            this.ThrowIfClosed();
-            if (Destination == null)
-            {
-                throw new IllegalStateException("Can not Send message on Anonymous Producer (without Destination).");
-            }
-            if (message == null)
-            {
-                throw new IllegalStateException("Can not Send a null message.");
-            }
-
-            DoSend(Destination, message, deliveryMode, priority, timeToLive);
-        }
-
-        public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-        {
-            this.ThrowIfClosed();
-            if (Destination != null)
-            {
-                throw new IllegalStateException("Can not Send message on Fixed Producer (with Destination).");
-            }
-            if (message == null)
-            {
-                throw new IllegalStateException("Can not Send a null message.");
-            }
-
-            DoSend(destination, message, deliveryMode, priority, timeToLive);
-        }
-
-        #endregion
-
-        #region Protected Methods
-        
-        protected override void Configure()
-        {
-            base.Configure();
-        }
-
-        protected IdGenerator MessageIdGenerator
-        {
-            get
-            {
-                if (msgIdGenerator == null)
-                {
-                    msgIdGenerator = new CustomIdGenerator(
-                        true,
-                        "ID", 
-                        new AtomicSequence()
-                        );
-                }
-                return msgIdGenerator;
-            }
-        }
-
-        protected void PrepareMessageForSend(Message.Message message)
-        {
-            if (message == null) return;
-            if (message is Message.BytesMessage)
-            {
-                (message as Message.BytesMessage).Reset();
-            }
-            else if (message is Message.StreamMessage)
-            {
-                (message as Message.StreamMessage).Reset();
-            }
-            else
-            {
-                message.IsReadOnly = true;
-            }
-            message.IsReadOnlyProperties = true;
-        }
-
-        protected void DoSend(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-        {
-            this.Attach();
-            bool sendSync = deliveryMode.Equals(MsgDeliveryMode.Persistent);
-            if(destination.IsTemporary && (destination as TemporaryDestination).IsDeleted)
-            {
-                throw new InvalidDestinationException("Can not send message on deleted temporary topic.");
-            }
-            message.NMSDestination = destination;
-            message.NMSDeliveryMode = deliveryMode;
-            message.NMSPriority = priority;
-            // If there is  timeToLive, set it before setting NMSTimestamp as timeToLive
-            // is required to calculate absolute expiry time.
-            // TBD: If the messageProducer has a non-default timeToLive and the message
-            // already has a timeToLive set by application, which should take precedence, this
-            // code overwrites the message TimeToLive in this case but not if the producer TimeToLive
-            // is the default ...
-            if (timeToLive != NMSConstants.defaultTimeToLive)
-            {
-                message.NMSTimeToLive = timeToLive;
-            }
-            if (!DisableMessageTimestamp)
-            {
-                message.NMSTimestamp = DateTime.UtcNow;
-            }
-         
-
-            if (!DisableMessageID)
-            {
-                message.NMSMessageId = MessageIdGenerator.GenerateId().ToString();
-            }
-
-            Amqp.Message amqpmsg = null;
-            if (message is Message.Message)
-            {
-                Message.Message copy = (message as Message.Message).Copy();
-                copy.NMSDestination = DestinationTransformation.Transform(Session.Connection, destination);
-                PrepareMessageForSend(copy);
-                IMessageCloak cloak = copy.GetMessageCloak();
-                if (cloak is AMQPMessageCloak)
-                {
-                    amqpmsg = (cloak as AMQPMessageCloak).AMQPMessage;
-                }
-            }
-            else
-            {
-                Message.Message nmsmsg = this.Session.Connection.TransformFactory.TransformMessage<Message.Message>(message);
-                PrepareMessageForSend(nmsmsg);
-                IMessageCloak cloak = nmsmsg.GetMessageCloak().Copy();
-                if (cloak is AMQPMessageCloak)
-                {
-                    amqpmsg = (cloak as AMQPMessageCloak).AMQPMessage;
-                }
-            }
-
-            
-
-            if (amqpmsg != null)
-            {
-                if (Tracer.IsDebugEnabled)
-                    Tracer.DebugFormat("Sending message : {0}", message.ToString());
-                
-                if(sendSync)
-                {
-                    DoAMQPSendSync(amqpmsg, this.RequestTimeout);
-                }
-                else
-                {
-                    DoAMQPSendAsync(amqpmsg, HandleAsyncAMQPMessageOutcome);
-                }
-            }
-
-        }
-
-        protected void DoAMQPSendAsync(Amqp.Message amqpMessage, OutcomeCallback ackCallback)
-        {
-
-            try
-            {
-                this.link.Send(amqpMessage, ackCallback, this);
-                MsgsSentOnLink++;
-            }
-            catch(Exception ex)
-            {
-                Tracer.ErrorFormat("Encountered Error on sending message from Producer {0}. Message: {1}. Stack : {2}.", Id, ex.Message, ex.StackTrace);
-                throw ExceptionSupport.Wrap(ex);
-            }
-        }
-
-        private static void HandleAsyncAMQPMessageOutcome(Amqp.ILink sender, Amqp.Message message, Outcome outcome, object state)
-        {
-            MessageProducer thisPtr = state as MessageProducer;
-            Exception failure = null;
-            bool isProducerClosed = (thisPtr.IsClosing || thisPtr.IsClosed);
-            if (outcome.Descriptor.Name.Equals(MessageSupport.REJECTED_INSTANCE.Descriptor.Name) && !isProducerClosed)
-            {
-                string msgId = MessageSupport.CreateNMSMessageId(message.Properties.GetMessageId());
-                Error err = (outcome as Amqp.Framing.Rejected).Error;
-                failure = ExceptionSupport.GetException(err, "Msg {0} rejected", msgId);
-            }
-            else if (outcome.Descriptor.Name.Equals(MessageSupport.RELEASED_INSTANCE.Descriptor.Name) && !isProducerClosed)
-            {
-                string msgId = MessageSupport.CreateNMSMessageId(message.Properties.GetMessageId());
-                Error err = new Error(ErrorCode.MessageReleased);
-                err.Description = "AMQP Message has been release by peer.";
-                failure = ExceptionSupport.GetException(err, "Msg {0} released", msgId);
-            }
-            if (failure != null && !isProducerClosed)
-            {
-                thisPtr.OnException(failure);
-            }
-            
-        }
-
-        protected void DoAMQPSendSync(Amqp.Message amqpMessage, TimeSpan timeout)
-        {
-            try
-            {
-                this.link.Send(amqpMessage, timeout);
-                MsgsSentOnLink++;
-            }
-            catch (TimeoutException tex)
-            {
-                throw ExceptionSupport.GetTimeoutException(this.link, tex.Message);
-            }
-            catch (AmqpException amqpEx)
-            {
-                string messageId = MessageSupport.CreateNMSMessageId(amqpMessage.Properties.GetMessageId());
-                throw ExceptionSupport.Wrap(amqpEx, "Failure to send message {1} on Producer {0}", this.Id, messageId);
-            }
-            catch (Exception ex)
-            {
-                Tracer.ErrorFormat("Encountered Error on sending message from Producer {0}. Message: {1}. Stack : {2}.", Id, ex.Message, ex.StackTrace);
-                throw ExceptionSupport.Wrap(ex);
-            }
-        }
-
-        #endregion
-
-        
-    }
-
-    #region Producer Info Class
-
-    internal class ProducerInfo : LinkInfo
-    {
... 27676 lines suppressed ...