You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/27 01:22:00 UTC

svn commit: r1545889 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/ src/main/csharp/Protocol/ src/main/csharp/Transport/ src/test/csharp/

Author: tabish
Date: Wed Nov 27 00:21:59 2013
New Revision: 1545889

URL: http://svn.apache.org/r1545889
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Nov 27 00:21:59 2013
@@ -3,3 +3,4 @@ build
 lib
 vs2008-mqtt.userprefs
 bin
+test-results

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Wed Nov 27 00:21:59 2013
@@ -548,7 +548,11 @@ namespace Apache.NMS.MQTT
 										else
 										{
 											ErrorResponse error = response as ErrorResponse;
-											NMSException exception = error.Error;
+											Exception exception = error.Error;
+											if (!(exception is NMSException)) 
+											{
+												exception = new NMSException(exception.Message);
+											}
 											// This is non-recoverable.
 											// Shutdown the transport connection, and re-create it, but don't start it.
 											// It will be started if the connection is re-attempted.

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs Wed Nov 27 00:21:59 2013
@@ -32,6 +32,10 @@ namespace Apache.NMS.MQTT.Protocol
 
         public void Marshal(Command cmd, BinaryWriter ds)
 		{
+			Tracer.DebugFormat("MQTT Command being sent: {0}", cmd);
+
+			Console.WriteLine("MQTT Command being sent: {0}", cmd);
+
 			MemoryStream buffer = new MemoryStream();
 			EndianBinaryWriter writer = new EndianBinaryWriter(buffer);
 
@@ -41,6 +45,7 @@ namespace Apache.NMS.MQTT.Protocol
 			ds.Write(fixedHeader);
 			WriteLength((int)buffer.Length, ds);
             ds.Write(buffer.GetBuffer(), 0, (int) buffer.Length);
+			ds.Flush();
 		}
 
         public Command Unmarshal(BinaryReader dis)
@@ -67,16 +72,24 @@ namespace Apache.NMS.MQTT.Protocol
 				cmd.Decode(reader);
 			}
 
-			// A CONNACK is a response, but if it has an error code, then we create a suitable
-			// ErrorResponse here with the correct NMSException in its payload.
-			if (cmd.IsCONNACK && cmd.IsErrorResponse)
+			// A CONNACK is a response and it's correlationId is always 1, but if it has an 
+            // error code, then we create a suitable ErrorResponse here with the correct 
+            // NMSException in its payload.
+			if (cmd.IsCONNACK) 
 			{
-				CONNACK connAck = cmd as CONNACK;
-				ErrorResponse error = new ErrorResponse();
-				error.Error = MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
-				cmd = error;
-			}
+    		    CONNACK connAck = cmd as CONNACK;
+                connAck.CorrelationId = 1;
+				if (cmd.IsErrorResponse)
+				{
+					ErrorResponse error = new ErrorResponse();
+					error.Error = MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
+					cmd = error;
+				}
+			} 
 
+			Tracer.DebugFormat("MQTT Command received: {0}", cmd);
+			Console.WriteLine("MQTT Command recieved: {0}", cmd);
+           
 			return cmd;
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs Wed Nov 27 00:21:59 2013
@@ -43,11 +43,13 @@ namespace Apache.NMS.MQTT.Transport
         short CommandId
         {
 			get;
+			set;
         }
 
         bool ResponseRequired
         {
 			get;
+			set;
         }
 
 		bool IsResponse

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs Wed Nov 27 00:21:59 2013
@@ -21,8 +21,8 @@ namespace Apache.NMS.MQTT.Transport
 {
 	public class ErrorResponse : Response
 	{
-		private NMSException error;
-		public NMSException Error
+		private Exception error;
+		public Exception Error
 		{
 			get { return error; }
 			set { this.error = value; }

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Wed Nov 27 00:21:59 2013
@@ -27,7 +27,7 @@ namespace Apache.NMS.MQTT.Transport
     public class ResponseCorrelator : TransportFilter
     {
         private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
-        private int nextCommandId;
+        private int nextCommandId = 1;  // 1 is always CONNECT -> CONNACK
 		private Exception error;
 
         public ResponseCorrelator(ITransport next) : base(next)
@@ -40,24 +40,34 @@ namespace Apache.NMS.MQTT.Transport
             base.OnException(sender, command);
         }
 
-        internal int GetNextCommandId()
+        internal short GetNextCommandId()
         {
-            return Interlocked.Increment(ref nextCommandId);
+			if (nextCommandId == UInt16.MaxValue)
+			{
+				nextCommandId = 1;
+			}
+            return (short) Interlocked.Increment(ref nextCommandId);
         }
 
         public override void Oneway(Command command)
         {
-//            command.CommandId = GetNextCommandId();
-//			command.ResponseRequired = false;
+            command.CommandId = GetNextCommandId();
+			command.ResponseRequired = false;
             next.Oneway(command);
         }
 
         public override FutureResponse AsyncRequest(Command command)
         {
-            int commandId = GetNextCommandId();
-
-//            command.CommandId = commandId;
-//            command.ResponseRequired = true;
+			Tracer.DebugFormat("ResponseCorrelator requesting: {0}", command);
+			if (command.IsCONNECT)
+			{
+				command.CommandId = 1;
+			}
+			else
+			{
+            	command.CommandId = GetNextCommandId();
+			}
+            command.ResponseRequired = true;
             FutureResponse future = new FutureResponse();
 	        Exception priorError = null;
 	        lock(requestMap.SyncRoot) 
@@ -65,17 +75,15 @@ namespace Apache.NMS.MQTT.Transport
 	            priorError = this.error;
 	            if(priorError == null) 
 				{
-		            requestMap[commandId] = future;
+		            requestMap[command.CommandId] = future;
 	            }
 	        }
 	
 	        if(priorError != null) 
 			{
-//				BrokerError brError = new BrokerError();
-//				brError.Message = priorError.Message;
-//				ExceptionResponse response = new ExceptionResponse();
-//				response.Exception = brError;
-//	            future.Response = response;
+				ErrorResponse response = new ErrorResponse();
+				response.Error = priorError;
+	            future.Response = response;
                 return future;
 	        }
 			
@@ -143,11 +151,9 @@ namespace Apache.NMS.MQTT.Transport
 			{
 				foreach(FutureResponse future in requests)
 				{
-//					BrokerError brError = new BrokerError();
-//					brError.Message = error.Message;
-//					ExceptionResponse response = new ExceptionResponse();
-//					response.Exception = brError;
-//		            future.Response = response;
+					ErrorResponse response = new ErrorResponse();
+					response.Error = error;
+		            future.Response = response;
 				}
 	        }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs Wed Nov 27 00:21:59 2013
@@ -25,24 +25,24 @@ namespace Apache.NMS.MQTT.Test
 	[TestFixture]
 	public class ConnectionFactoryTest
 	{
-//      [Test]
-//      [TestCase("mqtt:tcp://${activemqhost}:61613")]
-//      [TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
-//      [TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
-//		[TestCase("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
-//		[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("stomp:tcp://InvalidHost:61613?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("ftp://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("http://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("discovery://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("sms://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Test]
+        [TestCase("tcp://${activemqhost}:1883")]
+//      [TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
+//      [TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
+//		[TestCase("stomp:tcp://${activemqhost}:1883?connection.asyncsend=false")]
+//		[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("stomp:(tcp://${activemqhost}:1883)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("stomp:tcp://InvalidHost:1883?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("ftp://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("http://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("discovery://${activemqhost}:1888", ExpectedException = typeof(NMSConnectionException))]
+//		[TestCase("sms://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
 //		[TestCase("stomp:multicast://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
-//		[TestCase("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)", ExpectedException = typeof(UriFormatException))]
-//		[TestCase("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613", ExpectedException = typeof(UriFormatException))]
+//		[TestCase("(tcp://${activemqhost}:1883,tcp://${activemqhost}:1883)", ExpectedException = typeof(UriFormatException))]
+//		[TestCase("tcp://${activemqhost}:1883,tcp://${activemqhost}:1883", ExpectedException = typeof(UriFormatException))]
         public void TestURI(string connectionURI)
         {
             IConnectionFactory factory = new ConnectionFactory(
@@ -53,6 +53,23 @@ namespace Apache.NMS.MQTT.Test
                 Assert.IsNotNull(connection);
             }
         }
+
+        [Test]
+        [TestCase("tcp://${activemqhost}:1883")]
+        public void TestConnectionStarts(string connectionURI)
+        {
+			NMS.Tracer.Trace = new NmsConsoleTracer();
+            IConnectionFactory factory = new ConnectionFactory(
+				NMSTestSupport.ReplaceEnvVar(connectionURI));
+            Assert.IsNotNull(factory);
+            using(IConnection connection = factory.CreateConnection("", ""))
+            {
+                Assert.IsNotNull(connection);
+				// This should trigger a CONNECT frame and CONNACK response.
+				connection.Start();
+            }
+        }
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj Wed Nov 27 00:21:59 2013
@@ -6,7 +6,7 @@
     <ProductVersion>10.0.0</ProductVersion>
     <SchemaVersion>2.0</SchemaVersion>
     <ProjectGuid>{186FAC43-F3B0-4B03-82DA-EEC0169307B7}</ProjectGuid>
-    <OutputType>Exe</OutputType>
+    <OutputType>Library</OutputType>
     <RootNamespace>vs2008mqtttests</RootNamespace>
     <AssemblyName>vs2008-mqtt-tests</AssemblyName>
   </PropertyGroup>

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln Wed Nov 27 00:21:59 2013
@@ -21,6 +21,19 @@ Global
 		{AEBC857B-D693-4833-9F1E-F6A22787D0C9}.Release|x86.Build.0 = Release|Any CPU
 	EndGlobalSection
 	GlobalSection(MonoDevelopProperties) = preSolution
-		StartupItem = vs2008-mqtt.csproj
+		StartupItem = vs2008-mqtt-tests.csproj
+		Policies = $0
+		$0.TextStylePolicy = $1
+		$1.inheritsSet = null
+		$1.scope = text/x-csharp
+		$0.CSharpFormattingPolicy = $2
+		$2.inheritsSet = Mono
+		$2.inheritsScope = text/x-csharp
+		$2.scope = text/x-csharp
+		$0.TextStylePolicy = $3
+		$3.FileWidth = 120
+		$3.inheritsSet = VisualStudio
+		$3.inheritsScope = text/plain
+		$3.scope = text/plain
 	EndGlobalSection
 EndGlobal