You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/25 23:19:12 UTC

svn commit: r1545423 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/ src/main/csharp/Protocol/ src/main/csharp/Transport/ src/main/csharp/Transport/Tcp/ src/test/csharp/

Author: tabish
Date: Mon Nov 25 22:19:12 2013
New Revision: 1545423

URL: http://svn.apache.org/r1545423
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Mon Nov 25 22:19:12 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyVersionAttribute("1.7.0.3250")]
 [assembly: AssemblyInformationalVersionAttribute("1.7.0")]
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Mon Nov 25 22:19:12 2013
@@ -107,7 +107,7 @@ namespace Apache.NMS.MQTT
 				}
 
 				listener += value;
-				//this.session.Redispatch(this.unconsumedMessages);
+				this.session.Redispatch(this.unconsumedMessages);
 
 				if(wasStarted)
 				{
@@ -219,7 +219,7 @@ namespace Apache.NMS.MQTT
 				MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
 				if(dispatch != null)
 				{
-					//this.Dispatch(dispatch);
+					this.Dispatch(dispatch);
 					return true;
 				}
 			}

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,47 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.IO;
+using Apache.NMS.MQTT.Transport;
+
+namespace Apache.NMS.MQTT.Protocol
+{
+	public class MQTTWireFormat : IWireFormat
+	{
+		private ITransport transport;
+
+		public MQTTWireFormat()
+		{
+		}
+
+        public void Marshal(Object o, BinaryWriter ds)
+		{
+		}
+
+        public Object Unmarshal(BinaryReader dis)
+		{
+			return null;
+		}
+
+		public ITransport Transport
+		{
+			get { return this.transport; }
+			set { this.transport = value; }
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Mon Nov 25 22:19:12 2013
@@ -18,6 +18,7 @@ using System;
 using System.Collections;
 using System.Threading;
 using Apache.NMS.MQTT.Messages;
+using Apache.NMS.MQTT.Util;
 
 namespace Apache.NMS.MQTT
 {
@@ -512,6 +513,17 @@ namespace Apache.NMS.MQTT
             }
         }
 
+        internal void Redispatch(MessageDispatchChannel channel)
+        {
+            MessageDispatch[] messages = channel.RemoveAll();
+            System.Array.Reverse(messages);
+
+            foreach(MessageDispatch message in messages)
+            {
+                this.executor.ExecuteFirst(message);
+            }
+        }
+
         public void Dispatch(MessageDispatch dispatch)
         {
             if(this.executor != null)

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs Mon Nov 25 22:19:12 2013
@@ -1,20 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
 using System;
 
 namespace Apache.NMS.MQTT.Transport
@@ -178,6 +177,14 @@ namespace Apache.NMS.MQTT.Transport
 		/// </param>
 		void UpdateURIs(bool rebalance, Uri[] updatedURIs);
 
+        /// <summary>
+        /// Returns the IWireFormat object that this transport uses to marshal and
+        /// unmarshal Command objects.
+        /// </summary>
+        IWireFormat WireFormat
+        {
+            get;
+        }
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -1,30 +1,26 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
 using System;
 
 namespace Apache.NMS.MQTT.Transport
 {
-	public delegate void SetTransport(ITransport transport, Uri uri);
-
 	public interface ITransportFactory
 	{
 		ITransport CreateTransport(Uri location);
 		ITransport CompositeConnect(Uri location);
-		ITransport CompositeConnect(Uri location, SetTransport setTransport);
 	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,42 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.IO;
+
+namespace Apache.NMS.MQTT.Transport
+{
+    /// <summary>
+    /// Represents the marshalling of commands to and from an IO stream
+    /// </summary>
+    public interface IWireFormat
+    {
+        /// <summary>
+        /// Marshalls the given command object onto the stream
+        /// </summary>
+        void Marshal(Object o, BinaryWriter ds);
+
+        /// <summary>
+        /// Unmarshalls the next command object from the stream
+        /// </summary>
+        Object Unmarshal(BinaryReader dis);
+
+        ITransport Transport {
+            get; set;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/IWireFormat.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,50 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+	/// <summary>
+	/// A Transport filter that is used to log the commands sent and received.
+	/// </summary>
+	public class LoggingTransport : TransportFilter
+    {
+		public LoggingTransport(ITransport next) : base(next) 
+		{
+		}
+		
+		protected override void OnCommand(ITransport sender, Command command) 
+		{
+			Tracer.Info("RECEIVED: " + command);
+			this.commandHandler(sender, command);
+		}
+		
+		protected override void OnException(ITransport sender, Exception error)
+		{
+			Tracer.Error("RECEIVED Exception: " + error);
+			this.exceptionHandler(sender, error);
+		}
+		
+		public override void Oneway(Command command)
+		{
+			Tracer.Info("SENDING: " + command);
+			this.next.Oneway(command);
+		}
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/LoggingTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,102 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Transport
+{
+	/// <summary>
+	/// A Transport which guards access to the next transport using a mutex.
+	/// </summary>
+	public class MutexTransport : TransportFilter
+	{
+		private readonly object transmissionLock = new object();
+
+		private void GetTransmissionLock(int timeout)
+		{
+			if(timeout > 0)
+			{
+				DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout);
+				int waitCount = 1;
+
+				while(true)
+				{
+					if(Monitor.TryEnter(transmissionLock))
+					{
+						break;
+					}
+
+					if(DateTime.Now > timeoutTime)
+					{
+						throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
+					}
+
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to get the lock pegs the CPU.
+					Thread.Sleep(3 * (waitCount++));
+				}
+			}
+			else
+			{
+				Monitor.Enter(transmissionLock);
+			}
+		}
+
+		public MutexTransport(ITransport next) : base(next)
+		{
+		}
+
+		public override void Oneway(Command command)
+		{
+			GetTransmissionLock(this.next.Timeout);
+			try
+			{
+				base.Oneway(command);
+			}
+			finally
+			{
+				Monitor.Exit(transmissionLock);
+			}
+		}
+
+		public override FutureResponse AsyncRequest(Command command)
+		{
+			GetTransmissionLock(this.next.AsyncTimeout);
+			try
+			{
+				return base.AsyncRequest(command);
+			}
+			finally
+			{
+				Monitor.Exit(transmissionLock);
+			}
+		}
+
+		public override Response Request(Command command, TimeSpan timeout)
+		{
+			GetTransmissionLock((int) timeout.TotalMilliseconds);
+			try
+			{
+				return base.Request(command, timeout);
+			}
+			finally
+			{
+				Monitor.Exit(transmissionLock);
+			}
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MutexTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,158 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Collections;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+    /// <summary>
+    /// A Transport that correlates asynchronous send/receive messages into single request/response.
+    /// </summary>
+    public class ResponseCorrelator : TransportFilter
+    {
+        private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+        private int nextCommandId;
+		private Exception error;
+
+        public ResponseCorrelator(ITransport next) : base(next)
+        {
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+			Dispose(command);
+            base.OnException(sender, command);
+        }
+
+        internal int GetNextCommandId()
+        {
+            return Interlocked.Increment(ref nextCommandId);
+        }
+
+        public override void Oneway(Command command)
+        {
+//            command.CommandId = GetNextCommandId();
+//			command.ResponseRequired = false;
+            next.Oneway(command);
+        }
+
+        public override FutureResponse AsyncRequest(Command command)
+        {
+            int commandId = GetNextCommandId();
+
+//            command.CommandId = commandId;
+//            command.ResponseRequired = true;
+            FutureResponse future = new FutureResponse();
+	        Exception priorError = null;
+	        lock(requestMap.SyncRoot) 
+			{
+	            priorError = this.error;
+	            if(priorError == null) 
+				{
+		            requestMap[commandId] = future;
+	            }
+	        }
+	
+	        if(priorError != null) 
+			{
+//				BrokerError brError = new BrokerError();
+//				brError.Message = priorError.Message;
+//				ExceptionResponse response = new ExceptionResponse();
+//				response.Exception = brError;
+//	            future.Response = response;
+                return future;
+	        }
+			
+            next.Oneway(command);
+
+			return future;
+        }
+
+        public override Response Request(Command command, TimeSpan timeout)
+        {
+            FutureResponse future = AsyncRequest(command);
+            future.ResponseTimeout = timeout;
+            Response response = future.Response;
+            return response;
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if(command.IsResponse)
+            {
+                Response response = (Response) command;
+                int correlationId = response.CorrelationId;
+                FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+                if(future != null)
+                {
+                    requestMap.Remove(correlationId);
+                    future.Response = response;
+                }
+                else
+                {
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("Unknown response ID: " + response.CorrelationId + " for response: " + response);
+                    }
+                }
+            }
+            else
+            {
+                this.commandHandler(sender, command);
+            }
+        }
+		
+		public override void Stop()
+		{
+			this.Dispose(new IOException("Stopped"));
+			base.Stop();
+		}
+		
+		private void Dispose(Exception error)
+		{
+			ArrayList requests = null;
+			
+	        lock(requestMap.SyncRoot) 
+			{
+	            if(this.error == null) 
+				{
+	                this.error = error;
+	                requests = new ArrayList(requestMap.Values);
+	                requestMap.Clear();
+	            }
+	        }
+			
+	        if(requests != null)
+			{
+				foreach(FutureResponse future in requests)
+				{
+//					BrokerError brError = new BrokerError();
+//					brError.Message = error.Message;
+//					ExceptionResponse response = new ExceptionResponse();
+//					response.Exception = brError;
+//		            future.Response = response;
+				}
+	        }
+		}
+		
+    }
+}
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Net.Security;
+using System.Security.Authentication;
+using System.Security.Cryptography.X509Certificates;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+    public class SslTransport : TcpTransport
+    {
+        private string serverName;
+        private string clientCertSubject;
+        private string clientCertFilename;
+        private string clientCertPassword;
+        private string keyStoreName;
+        private string keyStoreLocation;
+        private bool acceptInvalidBrokerCert = false;
+
+        private SslStream sslStream;
+
+        public SslTransport(Uri location, Socket socket, IWireFormat wireFormat) :
+            base(location, socket, wireFormat)
+        {
+        }
+
+        ~SslTransport()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// Indicates the name of the Server's Certificate.  By default the Host name
+        /// of the remote server is used, however if this doesn't match the name of the
+        /// Server's certificate then this option can be set to override the default.
+        /// </summary>
+        public string ServerName
+        {
+            get { return this.serverName; }
+            set { this.serverName = value; }
+        }
+
+        public string ClientCertSubject
+        {
+            get { return this.clientCertSubject; }
+            set { this.clientCertSubject = value; }
+        }
+
+        /// <summary>
+        /// Indicates the location of the Client Certificate to use when the Broker
+        /// is configured for Client Auth (not common).  The SslTransport will supply
+        /// this certificate to the SslStream via the SelectLocalCertificate method.
+        /// </summary>
+        public string ClientCertFilename
+        {
+            get { return this.clientCertFilename; }
+            set { this.clientCertFilename = value; }
+        }
+
+        /// <summary>
+        /// Password for the Client Certificate specified via configuration.
+        /// </summary>
+        public string ClientCertPassword
+        {
+            get { return this.clientCertPassword; }
+            set { this.clientCertPassword = value; }
+        }
+
+        /// <summary>
+        /// Indicates if the SslTransport should ignore any errors in the supplied Broker
+        /// certificate and connect anyway, this is useful in testing with a default AMQ
+        /// broker certificate that is self signed.
+        /// </summary>
+        public bool AcceptInvalidBrokerCert
+        {
+            get { return this.acceptInvalidBrokerCert; }
+            set { this.acceptInvalidBrokerCert = value; }
+        }
+
+        public string KeyStoreName
+        {
+            get { return this.keyStoreName; }
+            set { this.keyStoreName = value; }
+        }
+
+        public string KeyStoreLocation
+        {
+            get { return this.keyStoreLocation; }
+            set { this.keyStoreLocation = value; }
+        }
+
+        protected override Stream CreateSocketStream()
+        {
+            if(this.sslStream != null)
+            {
+                return this.sslStream;
+            }
+
+            this.sslStream = new SslStream(
+                new NetworkStream(this.socket),
+                false,
+                new RemoteCertificateValidationCallback(ValidateServerCertificate),
+                new LocalCertificateSelectionCallback(SelectLocalCertificate) );
+
+            try
+            {
+
+                string remoteCertName = this.serverName ?? this.RemoteAddress.Host;
+                Tracer.Debug("Authorizing as Client for Server: " + remoteCertName);
+                sslStream.AuthenticateAsClient(remoteCertName, LoadCertificates(), SslProtocols.Default, false);
+                Tracer.Debug("Server is Authenticated = " + sslStream.IsAuthenticated);
+                Tracer.Debug("Server is Encrypted = " + sslStream.IsEncrypted);
+            }
+            catch(Exception e)
+            {
+                Tracer.ErrorFormat("Exception: {0}", e.Message);
+                if(e.InnerException != null)
+                {
+                    Tracer.ErrorFormat("Inner exception: {0}", e.InnerException.Message);
+                }
+                Tracer.Error("Authentication failed - closing the connection.");
+
+                throw;
+            }
+
+            return sslStream;
+        }
+
+        private bool ValidateServerCertificate(object sender,
+                                               X509Certificate certificate,
+                                               X509Chain chain,
+                                               SslPolicyErrors sslPolicyErrors)
+        {
+            Tracer.DebugFormat("ValidateServerCertificate: Issued By {0}", certificate.Issuer);
+            if(sslPolicyErrors == SslPolicyErrors.None)
+            {
+                return true;
+            }
+
+            Tracer.WarnFormat("Certificate error: {0}", sslPolicyErrors.ToString());
+            if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateChainErrors)
+            {
+                Tracer.Error("Chain Status errors: ");
+                foreach( X509ChainStatus status in chain.ChainStatus )
+                {
+                    Tracer.Error("*** Chain Status error: " + status.Status);
+                    Tracer.Error("*** Chain Status information: " + status.StatusInformation);
+                }
+            }
+            else if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateNameMismatch)
+            {
+                Tracer.Error("Mismatch between Remote Cert Name.");
+            }
+            else if(sslPolicyErrors == SslPolicyErrors.RemoteCertificateNotAvailable)
+            {
+                Tracer.Error("The Remote Certificate was not Available.");
+            }
+
+            // Configuration may or may not allow us to connect with an invliad broker cert.
+            return AcceptInvalidBrokerCert;
+        }
+
+        private X509Certificate SelectLocalCertificate(object sender,
+                                                       string targetHost,
+                                                       X509CertificateCollection localCertificates,
+                                                       X509Certificate remoteCertificate,
+                                                       string[] acceptableIssuers)
+        {
+            Tracer.DebugFormat("Client is selecting a local certificate from {0} possibilities.", localCertificates.Count);
+
+            if(localCertificates.Count == 1)
+            {
+                Tracer.Debug("Client has selected certificate with Subject = " + localCertificates[0].Subject);
+                return localCertificates[0];
+            }
+            else if(localCertificates.Count > 1 && this.clientCertSubject != null)
+            {
+                foreach(X509Certificate2 certificate in localCertificates)
+                {
+                    Tracer.Debug("Checking Client Certificate := " + certificate.ToString());
+                    if(String.Compare(certificate.Subject, this.clientCertSubject, true) == 0)
+                    {
+                        Tracer.Debug("Client has selected certificate with Subject = " + certificate.Subject);
+                        return certificate;
+                    }
+                }
+            }
+
+            Tracer.Debug("Client did not select a Certificate, returning null.");
+            return null;
+        }
+
+        private X509Certificate2Collection LoadCertificates()
+        {
+            X509Certificate2Collection collection = new X509Certificate2Collection();
+
+            if(!String.IsNullOrEmpty(this.clientCertFilename))
+            {
+                Tracer.Debug("Attempting to load Client Certificate from file := " + this.clientCertFilename);
+                X509Certificate2 certificate = new X509Certificate2(this.clientCertFilename, this.clientCertPassword);
+                Tracer.Debug("Loaded Client Certificate := " + certificate.ToString());
+
+                collection.Add(certificate);
+            }
+            else
+            {
+                string name = String.IsNullOrEmpty(this.keyStoreName) ? StoreName.My.ToString() : this.keyStoreName;
+
+                StoreLocation location = StoreLocation.CurrentUser;
+
+                if(!String.IsNullOrEmpty(this.keyStoreLocation))
+                {
+                    if(String.Compare(this.keyStoreLocation, "CurrentUser", true) == 0)
+                    {
+                        location = StoreLocation.CurrentUser;
+                    }
+                    else if(String.Compare(this.keyStoreLocation, "LocalMachine", true) == 0)
+                    {
+                        location = StoreLocation.LocalMachine;
+                    }
+                    else
+                    {
+                        throw new NMSException("Invlalid StoreLocation given on URI");
+                    }
+                }
+
+                X509Store store = new X509Store(name, location);
+                store.Open(OpenFlags.ReadOnly);
+                collection = store.Certificates;
+                store.Close();
+            }
+
+            return collection;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,96 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Web;
+using System.Net.Sockets;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+    [MQTTTransportFactory("ssl")]
+	public class SslTransportFactory : TcpTransportFactory
+	{
+        private string serverName;
+        private string clientCertSubject;
+        private string clientCertFilename;
+        private string clientCertPassword;
+        private string keyStoreName;
+        private string keyStoreLocation;
+        private bool acceptInvalidBrokerCert = false;
+        
+        public SslTransportFactory() : base()
+        {
+        }
+
+        public string ServerName
+        {
+            get { return this.serverName; }
+            set { this.serverName = value; }
+        }
+
+        public string ClientCertSubject
+        {
+            get { return this.clientCertSubject; }
+            set { this.clientCertSubject = value; }
+        }
+
+        public string ClientCertFilename
+        {
+            get { return this.clientCertFilename; }
+            set { this.clientCertFilename = value; }
+        }
+
+        public string ClientCertPassword
+        {
+            get { return this.clientCertPassword; }
+            set { this.clientCertPassword = value; }
+        }        
+
+        public bool AcceptInvalidBrokerCert
+        {
+            get { return this.acceptInvalidBrokerCert; }
+            set { this.acceptInvalidBrokerCert = value; }
+        }
+
+        public string KeyStoreName
+        {
+            get { return this.keyStoreName; }
+            set { this.keyStoreName = value; }
+        }
+
+        public string KeyStoreLocation
+        {
+            get { return this.keyStoreLocation; }
+            set { this.keyStoreLocation = value; }
+        }
+
+		protected override ITransport DoCreateTransport(Uri location, Socket socket, IWireFormat wireFormat )
+		{
+            Tracer.Debug("Creating new instance of the SSL Transport.");
+			SslTransport transport = new SslTransport(location, socket, wireFormat);
+
+            transport.ClientCertSubject = HttpUtility.UrlDecode(this.clientCertSubject);
+            transport.ClientCertFilename = this.clientCertFilename;
+            transport.ClientCertPassword = this.clientCertPassword;
+            transport.ServerName = this.serverName;
+            transport.KeyStoreLocation = this.keyStoreLocation;
+            transport.KeyStoreName = this.keyStoreName;
+            transport.AcceptInvalidBrokerCert = this.acceptInvalidBrokerCert;
+            
+            return transport;
+		}		
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/SslTransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+	/// <summary>
+	/// An implementation of ITransport that uses sockets to communicate with the broker
+	/// </summary>
+	public class TcpTransport : ITransport
+	{
+		protected readonly object myLock = new object();
+		protected readonly Socket socket;
+		private IWireFormat wireformat;
+		private BinaryReader socketReader;
+		private BinaryWriter socketWriter;
+		private Thread readThread;
+		private bool started;
+		private bool disposed = false;
+		private readonly Atomic<bool> closed = new Atomic<bool>(false);
+		private volatile bool seenShutdown;
+		private readonly Uri connectedUri;
+		private int timeout = -1;
+		private int asynctimeout = -1;
+
+		private CommandHandler commandHandler;
+		private ExceptionHandler exceptionHandler;
+		private InterruptedHandler interruptedHandler;
+		private ResumedHandler resumedHandler;
+		private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+        /// <summary>
+        /// Size in bytes of the receive buffer.
+        /// </summary>
+        private int receiveBufferSize = 8192;
+        public int ReceiveBufferSize
+        {
+            get { return receiveBufferSize; }
+            set { receiveBufferSize = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of send buffer.
+        /// </summary>
+        private int sendBufferSize = 8192;
+        public int SendBufferSize
+        {
+            get { return sendBufferSize; }
+            set { sendBufferSize = value; }
+        }
+
+		public TcpTransport(Uri uri, Socket socket, IWireFormat wireformat)
+		{
+			this.connectedUri = uri;
+			this.socket = socket;
+			this.wireformat = wireformat;
+		}
+
+		~TcpTransport()
+		{
+			Dispose(false);
+		}
+
+		protected virtual Stream CreateSocketStream()
+		{
+			return new NetworkStream(socket);
+		}
+
+		/// <summary>
+		/// Method Start
+		/// </summary>
+		public void Start()
+		{
+			lock(myLock)
+			{
+				if(!started)
+				{
+					if(null == commandHandler)
+					{
+						throw new InvalidOperationException(
+								"command cannot be null when Start is called.");
+					}
+
+					if(null == exceptionHandler)
+					{
+						throw new InvalidOperationException(
+								"exception cannot be null when Start is called.");
+					}
+
+					started = true;
+
+					// Initialize our Read and Writer instances.  Its not actually necessary
+					// to have two distinct NetworkStream instances but for now the TcpTransport
+					// will continue to do so for legacy reasons.
+					socketWriter = new EndianBinaryWriter(new BufferedStream(CreateSocketStream(), sendBufferSize));
+					socketReader = new EndianBinaryReader(new BufferedStream(CreateSocketStream(), receiveBufferSize));
+
+					// now lets create the background read thread
+					readThread = new Thread(new ThreadStart(ReadLoop)) { IsBackground = true };
+					readThread.Start();
+				}
+			}
+		}
+
+		/// <summary>
+		/// Property IsStarted
+		/// </summary>
+		public bool IsStarted
+		{
+			get
+			{
+				lock(myLock)
+				{
+					return started;
+				}
+			}
+		}
+
+		public virtual void Oneway(Command command)
+		{
+			lock(myLock)
+			{
+				if(closed.Value)
+				{
+					this.exceptionHandler(this, new InvalidOperationException("Error writing to broker.  Transport connection is closed."));
+					return;
+				}
+
+				if(command is DISCONNECT)
+				{
+					seenShutdown = true;
+				}
+
+				WireFormat.Marshal(command, socketWriter);
+			}
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
+		}
+
+		public bool TcpNoDelayEnabled
+		{
+#if !NETCF
+			get { return this.socket.NoDelay; }
+			set { this.socket.NoDelay = value; }
+#else
+			get { return false; }
+			set { }
+#endif
+		}
+
+		public Response Request(Command command)
+		{
+			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+		}
+
+		public Response Request(Command command, TimeSpan timeout)
+		{
+			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+		}
+
+		public void Stop()
+		{
+			Close();
+		}
+
+		public void Close()
+		{
+			lock(myLock)
+			{
+				if(closed.CompareAndSet(false, true))
+				{
+					try
+					{
+						socket.Shutdown(SocketShutdown.Both);
+					}
+					catch
+					{
+					}
+
+					try
+					{
+						if(null != socketWriter)
+						{
+							socketWriter.Close();
+						}
+					}
+					catch
+					{
+					}
+					finally
+					{
+						socketWriter = null;
+					}
+
+					try
+					{
+						if(null != socketReader)
+						{
+							socketReader.Close();
+						}
+					}
+					catch
+					{
+					}
+					finally
+					{
+						socketReader = null;
+					}
+
+					try
+					{
+						socket.Close();
+					}
+					catch
+					{
+					}
+
+					if(null != readThread)
+					{
+						if(Thread.CurrentThread != readThread && readThread.IsAlive)
+						{
+							if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
+							{
+								readThread.Abort();
+							}
+						}
+
+						readThread = null;
+					}
+
+					started = false;
+				}
+			}
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			Close();
+			disposed = true;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
+		}
+
+		public void ReadLoop()
+		{
+			// This is the thread function for the reader thread. This runs continuously
+			// performing a blokcing read on the socket and dispatching all commands
+			// received.
+			//
+			// Exception Handling
+			// ------------------
+			// If an Exception occurs during the reading/marshalling, then the connection
+			// is effectively broken because position cannot be re-established to the next
+			// message.  This is reported to the app via the exceptionHandler and the socket
+			// is closed to prevent further communication attempts.
+			//
+			// An exception in the command handler may not be fatal to the transport, so
+			// these are simply reported to the exceptionHandler.
+			//
+			while(!closed.Value)
+			{
+				Command command = null;
+
+				try
+				{
+					command = (Command) WireFormat.Unmarshal(socketReader);
+				}
+				catch(Exception ex)
+				{
+					command = null;
+					if(!closed.Value)
+					{
+						// Close the socket as there's little that can be done with this transport now.
+						Close();
+						if(!seenShutdown)
+						{
+							this.exceptionHandler(this, ex);
+						}
+					}
+
+					break;
+				}
+
+				try
+				{
+					if(command != null)
+					{
+						this.commandHandler(this, command);
+					}
+				}
+				catch(Exception e)
+				{
+					this.exceptionHandler(this, e);
+				}
+			}
+		}
+
+		// Implementation methods
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending synchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		public int Timeout
+		{
+			get { return this.timeout; }
+			set { this.timeout = value; }
+		}
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		public int AsyncTimeout
+		{
+			get { return this.asynctimeout; }
+			set { this.asynctimeout = value; }
+		}
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { this.commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { this.exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+
+		public IWireFormat WireFormat
+		{
+			get { return wireformat; }
+			set { wireformat = value; }
+		}
+
+		public bool IsFaultTolerant
+		{
+			get { return false; }
+		}
+
+		public bool IsConnected
+		{
+			get { return socket.Connected; }
+		}
+
+		public Uri RemoteAddress
+		{
+			get { return connectedUri; }
+		}
+
+		public Object Narrow(Type type)
+		{
+			return this.GetType().Equals(type) ? this : null;
+		}
+
+		public bool IsReconnectSupported
+		{
+			get { return false; }
+		}
+
+		public bool IsUpdateURIsSupported
+		{
+			get { return false; }
+		}
+
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			throw new IOException();
+		}
+	}
+}
+
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using System.Net;
+using System.Net.Sockets;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Protocol;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Transport.Tcp
+{
+    [MQTTTransportFactory("tcp")]
+    public class TcpTransportFactory : ITransportFactory
+    {
+        public TcpTransportFactory()
+        {
+        }
+
+        #region Properties
+
+        private bool useLogging = false;
+        public bool UseLogging
+        {
+            get { return useLogging; }
+            set { useLogging = value; }
+        }
+
+        /// <summary>
+        /// Should the Inactivity Monitor be enabled on this Transport.
+        /// </summary>
+        private bool useInactivityMonitor = true;
+        public bool UseInactivityMonitor
+        {
+           get { return this.useInactivityMonitor; }
+           set { this.useInactivityMonitor = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of the receive buffer.
+        /// </summary>
+        private int receiveBufferSize = 8192;
+        public int ReceiveBufferSize
+        {
+            get { return receiveBufferSize; }
+            set { receiveBufferSize = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of send buffer.
+        /// </summary>
+        private int sendBufferSize = 8192;
+        public int SendBufferSize
+        {
+            get { return sendBufferSize; }
+            set { sendBufferSize = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. The default value is 0, which indicates
+        /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int receiveTimeout = 0;
+        public int ReceiveTimeout
+        {
+            get { return receiveTimeout; }
+            set { receiveTimeout = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+        /// the value will be changed to 500. The default value is 0, which indicates an infinite
+        /// time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int sendTimeout = 0;
+        public int SendTimeout
+        {
+            get { return sendTimeout; }
+            set { sendTimeout = value; }
+        }
+
+		private int connectTimeout = 30000;
+		public int ConnectTimeout
+		{
+			get { return connectTimeout; }
+			set { this.connectTimeout = value; }
+		}
+
+        #endregion
+
+        #region ITransportFactory Members
+
+        public ITransport CompositeConnect(Uri location)
+        {
+            // Extract query parameters from broker Uri
+            StringDictionary map = URISupport.ParseQuery(location.Query);
+
+            // Set transport. properties on this (the factory)
+            URISupport.SetProperties(this, map, "transport.");
+
+			// See if there is a local address and port specified
+			string localAddress = null;
+			int localPort = -1;
+			
+			if(!String.IsNullOrEmpty(location.AbsolutePath) && !location.AbsolutePath.Equals("/"))
+			{
+				int index = location.AbsolutePath.IndexOf(':');
+				try
+				{
+					localPort = Int16.Parse(location.AbsolutePath.Substring(index + 1));					
+					localAddress = location.AbsolutePath.Substring(1, index - 1);
+					Tracer.DebugFormat("Binding Socket to {0} on port: {1}", localAddress, localPort);
+				}
+				catch
+				{
+            		Tracer.Warn("Invalid Port value on URI for local bind option, ignoring.");
+				}
+			}
+			
+            Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);            
+			Socket socket = DoConnect(location.Host, location.Port, localAddress, localPort );
+			
+            socket.ReceiveBufferSize = ReceiveBufferSize;
+            socket.SendBufferSize = SendBufferSize;
+            socket.ReceiveTimeout = ReceiveTimeout;
+            socket.SendTimeout = SendTimeout;
+
+			MQTTWireFormat wireformat = new MQTTWireFormat();
+            ITransport transport = DoCreateTransport(location, socket, wireformat);
+
+            wireformat.Transport = transport;
+
+			// TODO
+//            if(UseLogging)
+//            {
+//                transport = new LoggingTransport(transport);
+//            }
+//
+//            if(UseInactivityMonitor)
+//            {
+//               transport = new InactivityMonitor(transport);
+//            }
+
+            return transport;
+        }
+
+        public ITransport CreateTransport(Uri location)
+        {
+            ITransport transport = CompositeConnect(location);
+
+            transport = new MutexTransport(transport);
+            transport = new ResponseCorrelator(transport);
+
+            return transport;
+        }
+
+        #endregion
+
+		/// <summary>
+		/// Override in a subclass to create the specific type of transport that is
+		/// being implemented.
+		/// </summary>
+		protected virtual ITransport DoCreateTransport(Uri location, Socket socket, IWireFormat wireFormat )
+		{
+			TcpTransport transport = new TcpTransport(location, socket, wireFormat);
+
+			// Apply the buffer sizes to the transport also so that it can buffer above the
+			// TCP level which can eagerly send causing sparse packets.
+			transport.SendBufferSize = SendBufferSize;
+			transport.ReceiveBufferSize = ReceiveBufferSize;
+
+			return transport;
+		}
+		
+        // DISCUSSION: Caching host entries may not be the best strategy when using the
+        // failover protocol.  The failover protocol needs to be very dynamic when looking
+        // up hostnames at runtime.  If old hostname->IP mappings are kept around, this may
+        // lead to runtime failures that could have been avoided by dynamically looking up
+        // the new hostname IP.
+#if CACHE_HOSTENTRIES
+        private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+        private static readonly object _syncLock = new object();
+#endif
+        public static IPHostEntry GetIPHostEntry(string host)
+        {
+            IPHostEntry ipEntry;
+
+#if CACHE_HOSTENTRIES
+            string hostUpperName = host.ToUpper();
+
+            lock (_syncLock)
+            {
+                if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+                {
+                    try
+                    {
+                        ipEntry = Dns.GetHostEntry(hostUpperName);
+                        CachedIPHostEntries.Add(hostUpperName, ipEntry);
+                    }
+                    catch
+                    {
+                        ipEntry = null;
+                    }
+                }
+            }
+#else
+            try
+            {
+                ipEntry = Dns.GetHostEntry(host);
+            }
+            catch
+            {
+                ipEntry = null;
+            }
+#endif
+
+            return ipEntry;
+        }
+
+        private Socket TryConnectSocket(IPAddress address, int port, string localAddress, int localPort)
+        {
+            if(null != address)
+            {
+                try
+                {
+                    Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+                    if(null != socket)
+                    {
+						if(!String.IsNullOrEmpty(localAddress))
+						{
+							DoBind(socket, localAddress, localPort);
+						}
+
+						IAsyncResult result = socket.BeginConnect(new IPEndPoint(address, port), null, null);
+						result.AsyncWaitHandle.WaitOne(ConnectTimeout, true);
+						if(!socket.Connected)
+						{
+				            socket.Close();
+						}
+						else
+						{
+							return socket;
+						}
+                    }
+                }
+                catch
+                {
+                }
+            }
+
+            return null;
+        }
+
+        public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+        {
+            return IPAddress.TryParse(host, out ipaddress);
+        }
+
+        public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+        {
+            IPAddress ipaddress = null;
+            IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+            if(null != hostEntry)
+            {
+                ipaddress = GetIPAddress(hostEntry, addressFamily);
+            }
+
+            return ipaddress;
+        }
+
+        public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+        {
+            if(null != hostEntry)
+            {
+                foreach(IPAddress address in hostEntry.AddressList)
+                {
+                    if(address.AddressFamily == addressFamily)
+                    {
+                        return address;
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        protected Socket DoConnect(string host, int port, string localAddress, int localPort)
+        {
+            Socket socket = null;
+            IPAddress ipaddress;
+
+            try
+            {
+                if(TryParseIPAddress(host, out ipaddress))
+                {
+                    socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+                }
+                else
+                {
+                    // Looping through the AddressList allows different type of connections to be tried
+                    // (IPv6, IPv4 and whatever else may be available).
+                    IPHostEntry hostEntry = GetIPHostEntry(host);
+
+                    if(null != hostEntry)
+                    {
+                        // Prefer IPv6 first.
+                        ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+                        socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+                        if(null == socket)
+                        {
+                            // Try IPv4 next.
+                            ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+                            socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+                            if(null == socket)
+                            {
+                                // Try whatever else there is.
+                                foreach(IPAddress address in hostEntry.AddressList)
+                                {
+                                    if(AddressFamily.InterNetworkV6 == address.AddressFamily
+                                        || AddressFamily.InterNetwork == address.AddressFamily)
+                                    {
+                                        // Already tried these protocols.
+                                        continue;
+                                    }
+
+                            		socket = TryConnectSocket(ipaddress, port, localAddress, localPort);
+                                    if(null != socket)
+                                    {
+                                        ipaddress = address;
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if(null == socket)
+                {
+                    throw new SocketException();
+                }
+            }
+            catch(Exception ex)
+            {
+                throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+            }
+
+            Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+            return socket;
+        }
+		
+        protected void DoBind(Socket socket, string host, int port)
+        {
+            IPAddress ipaddress;
+
+            try
+            {
+                if(TryParseIPAddress(host, out ipaddress))
+                {
+                    TryBindSocket(socket, ipaddress, port);
+                }
+                else
+                {
+                    // Looping through the AddressList allows different type of connections to be tried
+                    // (IPv6, IPv4 and whatever else may be available).
+                    IPHostEntry hostEntry = GetIPHostEntry(host);
+
+                    if(null != hostEntry)
+                    {
+                        // Prefer IPv6 first.
+                        ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+                        if(!TryBindSocket(socket, ipaddress, port))
+                        {
+                            // Try IPv4 next.
+                            ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+	                        if(!TryBindSocket(socket, ipaddress, port))
+                            {
+                                // Try whatever else there is.
+                                foreach(IPAddress address in hostEntry.AddressList)
+                                {
+                                    if(AddressFamily.InterNetworkV6 == address.AddressFamily
+                                        || AddressFamily.InterNetwork == address.AddressFamily)
+                                    {
+                                        // Already tried these protocols.
+                                        continue;
+                                    }
+
+                        			if(TryBindSocket(socket, ipaddress, port))
+                                    {
+                                        ipaddress = address;
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if(!socket.IsBound)
+                {
+                    throw new SocketException();
+                }
+            }
+            catch(Exception ex)
+            {
+                throw new NMSConnectionException(String.Format("Error binding to {0}:{1}.", host, port), ex);
+            }
+
+            Tracer.DebugFormat("Bound to {0}:{1} using.", host, port);
+        }		
+		
+        private bool TryBindSocket(Socket socket, IPAddress address, int port)
+        {
+            if(null != socket && null != address)
+            {
+                try
+                {
+                    socket.Bind(new IPEndPoint(address, port));
+                    if(socket.IsBound)
+                    {
+                        return true;
+                    }
+                }
+                catch
+                {
+                }
+            }
+
+            return false;
+        }
+		
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1545423&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs Mon Nov 25 22:19:12 2013
@@ -0,0 +1,267 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+	/// <summary>
+	/// Used to implement a filter on the transport layer.
+	/// </summary>
+	public class TransportFilter : ITransport
+	{
+		protected readonly ITransport next;
+		protected CommandHandler commandHandler;
+		protected ExceptionHandler exceptionHandler;
+		protected InterruptedHandler interruptedHandler;
+		protected ResumedHandler resumedHandler;
+		private bool disposed = false;
+
+		public TransportFilter(ITransport next)
+		{
+			this.next = next;
+			this.next.Command = new CommandHandler(OnCommand);
+			this.next.Exception = new ExceptionHandler(OnException);
+            this.next.Interrupted = new InterruptedHandler(OnInterrupted);
+            this.next.Resumed = new ResumedHandler(OnResumed);
+		}
+
+		~TransportFilter()
+		{
+			Dispose(false);
+		}
+
+		protected virtual void OnCommand(ITransport sender, Command command)
+		{
+			this.commandHandler(sender, command);
+		}
+
+		protected virtual void OnException(ITransport sender, Exception command)
+		{
+			this.exceptionHandler(sender, command);
+		}
+
+        protected virtual void OnInterrupted(ITransport sender)
+        {
+            if(this.interruptedHandler != null)
+            {
+                this.interruptedHandler(sender);
+            }
+        }
+
+        protected virtual void OnResumed(ITransport sender)
+        {
+            if(this.resumedHandler != null)
+            {
+                this.resumedHandler(sender);
+            }
+        }
+        
+		/// <summary>
+		/// Method Oneway
+		/// </summary>
+		/// <param name="command">A  Command</param>
+		public virtual void Oneway(Command command)
+		{
+			this.next.Oneway(command);
+		}
+
+		/// <summary>
+		/// Method AsyncRequest
+		/// </summary>
+		/// <returns>A FutureResponse</returns>
+		/// <param name="command">A  Command</param>
+		public virtual FutureResponse AsyncRequest(Command command)
+		{
+			return this.next.AsyncRequest(command);
+		}
+
+		/// <summary>
+		/// Method Request
+		/// </summary>
+		/// <returns>A Response</returns>
+		/// <param name="command">A  Command</param>
+		public virtual Response Request(Command command)
+		{			
+			return Request(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite));
+		}
+
+		/// <summary>
+		/// Method Request with time out for Response.
+		/// </summary>
+		/// <returns>A Response</returns>
+		/// <param name="command">A  Command</param>
+		/// <param name="timeout">Timeout in milliseconds</param>
+		public virtual Response Request(Command command, TimeSpan timeout)
+		{
+			return this.next.Request(command, timeout);
+		}
+
+		/// <summary>
+		/// Method Start
+		/// </summary>
+		public virtual void Start()
+		{
+			if(commandHandler == null)
+			{
+				throw new InvalidOperationException("command cannot be null when Start is called.");
+			}
+
+			if(exceptionHandler == null)
+			{
+				throw new InvalidOperationException("exception cannot be null when Start is called.");
+			}
+
+			this.next.Start();
+		}
+
+		/// <summary>
+		/// Property IsStarted
+		/// </summary>
+		public bool IsStarted
+		{
+			get { return this.next.IsStarted; }
+		}
+
+		/// <summary>
+		/// Method Dispose
+		/// </summary>
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected virtual void Dispose(bool disposing)
+		{
+			if(disposing && !disposed)
+			{
+                Tracer.Debug("TransportFilter disposing of next Transport: " +
+                             this.next.GetType().Name);
+				this.next.Dispose();
+			}
+			disposed = true;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
+		}
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { this.commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { this.exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+		
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+		
+		public virtual void Stop()
+		{
+            this.next.Stop();
+		}
+
+        public Object Narrow(Type type)
+        {
+            if (this.GetType().Equals(type)) 
+			{
+                return this;
+            } 
+			else if(this.next != null)
+			{
+                return this.next.Narrow( type );
+            }
+        
+            return null;
+        }
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending synchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		public int Timeout
+		{
+			get { return next.Timeout; }
+			set { next.Timeout = value; }
+		}
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		public int AsyncTimeout
+		{
+			get { return next.AsyncTimeout; }
+			set { next.AsyncTimeout = value; }
+		}
+		
+		public bool IsFaultTolerant
+        {
+            get { return next.IsFaultTolerant; }
+        }
+
+        public bool IsConnected
+        {
+            get { return next.IsConnected; }
+        }
+
+        public Uri RemoteAddress
+        {
+            get { return next.RemoteAddress; }
+        }
+		
+	    public bool IsReconnectSupported
+		{
+			get { return next.IsReconnectSupported; }
+		}
+	    
+	    public bool IsUpdateURIsSupported
+		{
+			get { return next.IsUpdateURIsSupported; }
+		}
+		
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			next.UpdateURIs(rebalance, updatedURIs);
+		}
+
+        public IWireFormat WireFormat
+        {
+            get { return next.WireFormat; }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFilter.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs Mon Nov 25 22:19:12 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyVersionAttribute("1.7.0.3250")]
 [assembly: AssemblyInformationalVersionAttribute("1.7.0")]
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1545423&r1=1545422&r2=1545423&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Mon Nov 25 22:19:12 2013
@@ -34,6 +34,7 @@
     <Reference Include="Apache.NMS">
       <HintPath>lib\Apache.NMS\mono-2.0\Apache.NMS.dll</HintPath>
     </Reference>
+    <Reference Include="System.Web" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
   <ItemGroup>
@@ -93,6 +94,16 @@
     <Compile Include="src\main\csharp\RequestTimedOutException.cs" />
     <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
     <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Transport\Tcp\SslTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Tcp\SslTransportFactory.cs" />
+    <Compile Include="src\main\csharp\Transport\Tcp\TcpTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs" />
+    <Compile Include="src\main\csharp\Transport\IWireFormat.cs" />
+    <Compile Include="src\main\csharp\Transport\TransportFilter.cs" />
+    <Compile Include="src\main\csharp\Protocol\MQTTWireFormat.cs" />
+    <Compile Include="src\main\csharp\Transport\LoggingTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\MutexTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\ResponseCorrelator.cs" />
   </ItemGroup>
   <ItemGroup>
     <Folder Include="keyfile\" />