You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/27 01:22:00 UTC
svn commit: r1545889 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk:
./ src/main/csharp/ src/main/csharp/Protocol/ src/main/csharp/Transport/
src/test/csharp/
Author: tabish
Date: Wed Nov 27 00:21:59 2013
New Revision: 1545889
URL: http://svn.apache.org/r1545889
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Nov 27 00:21:59 2013
@@ -3,3 +3,4 @@ build
lib
vs2008-mqtt.userprefs
bin
+test-results
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Wed Nov 27 00:21:59 2013
@@ -548,7 +548,11 @@ namespace Apache.NMS.MQTT
else
{
ErrorResponse error = response as ErrorResponse;
- NMSException exception = error.Error;
+ Exception exception = error.Error;
+ if (!(exception is NMSException))
+ {
+ exception = new NMSException(exception.Message);
+ }
// This is non-recoverable.
// Shutdown the transport connection, and re-create it, but don't start it.
// It will be started if the connection is re-attempted.
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Protocol/MQTTWireFormat.cs Wed Nov 27 00:21:59 2013
@@ -32,6 +32,10 @@ namespace Apache.NMS.MQTT.Protocol
public void Marshal(Command cmd, BinaryWriter ds)
{
+ Tracer.DebugFormat("MQTT Command being sent: {0}", cmd);
+
+ Console.WriteLine("MQTT Command being sent: {0}", cmd);
+
MemoryStream buffer = new MemoryStream();
EndianBinaryWriter writer = new EndianBinaryWriter(buffer);
@@ -41,6 +45,7 @@ namespace Apache.NMS.MQTT.Protocol
ds.Write(fixedHeader);
WriteLength((int)buffer.Length, ds);
ds.Write(buffer.GetBuffer(), 0, (int) buffer.Length);
+ ds.Flush();
}
public Command Unmarshal(BinaryReader dis)
@@ -67,16 +72,24 @@ namespace Apache.NMS.MQTT.Protocol
cmd.Decode(reader);
}
- // A CONNACK is a response, but if it has an error code, then we create a suitable
- // ErrorResponse here with the correct NMSException in its payload.
- if (cmd.IsCONNACK && cmd.IsErrorResponse)
+ // A CONNACK is a response and it's correlationId is always 1, but if it has an
+ // error code, then we create a suitable ErrorResponse here with the correct
+ // NMSException in its payload.
+ if (cmd.IsCONNACK)
{
- CONNACK connAck = cmd as CONNACK;
- ErrorResponse error = new ErrorResponse();
- error.Error = MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
- cmd = error;
- }
+ CONNACK connAck = cmd as CONNACK;
+ connAck.CorrelationId = 1;
+ if (cmd.IsErrorResponse)
+ {
+ ErrorResponse error = new ErrorResponse();
+ error.Error = MQTTExceptionFactory.CreateConnectionException(connAck.ReturnCode);
+ cmd = error;
+ }
+ }
+ Tracer.DebugFormat("MQTT Command received: {0}", cmd);
+ Console.WriteLine("MQTT Command recieved: {0}", cmd);
+
return cmd;
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs Wed Nov 27 00:21:59 2013
@@ -43,11 +43,13 @@ namespace Apache.NMS.MQTT.Transport
short CommandId
{
get;
+ set;
}
bool ResponseRequired
{
get;
+ set;
}
bool IsResponse
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ErrorResponse.cs Wed Nov 27 00:21:59 2013
@@ -21,8 +21,8 @@ namespace Apache.NMS.MQTT.Transport
{
public class ErrorResponse : Response
{
- private NMSException error;
- public NMSException Error
+ private Exception error;
+ public Exception Error
{
get { return error; }
set { this.error = value; }
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Wed Nov 27 00:21:59 2013
@@ -27,7 +27,7 @@ namespace Apache.NMS.MQTT.Transport
public class ResponseCorrelator : TransportFilter
{
private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
- private int nextCommandId;
+ private int nextCommandId = 1; // 1 is always CONNECT -> CONNACK
private Exception error;
public ResponseCorrelator(ITransport next) : base(next)
@@ -40,24 +40,34 @@ namespace Apache.NMS.MQTT.Transport
base.OnException(sender, command);
}
- internal int GetNextCommandId()
+ internal short GetNextCommandId()
{
- return Interlocked.Increment(ref nextCommandId);
+ if (nextCommandId == UInt16.MaxValue)
+ {
+ nextCommandId = 1;
+ }
+ return (short) Interlocked.Increment(ref nextCommandId);
}
public override void Oneway(Command command)
{
-// command.CommandId = GetNextCommandId();
-// command.ResponseRequired = false;
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = false;
next.Oneway(command);
}
public override FutureResponse AsyncRequest(Command command)
{
- int commandId = GetNextCommandId();
-
-// command.CommandId = commandId;
-// command.ResponseRequired = true;
+ Tracer.DebugFormat("ResponseCorrelator requesting: {0}", command);
+ if (command.IsCONNECT)
+ {
+ command.CommandId = 1;
+ }
+ else
+ {
+ command.CommandId = GetNextCommandId();
+ }
+ command.ResponseRequired = true;
FutureResponse future = new FutureResponse();
Exception priorError = null;
lock(requestMap.SyncRoot)
@@ -65,17 +75,15 @@ namespace Apache.NMS.MQTT.Transport
priorError = this.error;
if(priorError == null)
{
- requestMap[commandId] = future;
+ requestMap[command.CommandId] = future;
}
}
if(priorError != null)
{
-// BrokerError brError = new BrokerError();
-// brError.Message = priorError.Message;
-// ExceptionResponse response = new ExceptionResponse();
-// response.Exception = brError;
-// future.Response = response;
+ ErrorResponse response = new ErrorResponse();
+ response.Error = priorError;
+ future.Response = response;
return future;
}
@@ -143,11 +151,9 @@ namespace Apache.NMS.MQTT.Transport
{
foreach(FutureResponse future in requests)
{
-// BrokerError brError = new BrokerError();
-// brError.Message = error.Message;
-// ExceptionResponse response = new ExceptionResponse();
-// response.Exception = brError;
-// future.Response = response;
+ ErrorResponse response = new ErrorResponse();
+ response.Error = error;
+ future.Response = response;
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/ConnectionFactoryTest.cs Wed Nov 27 00:21:59 2013
@@ -25,24 +25,24 @@ namespace Apache.NMS.MQTT.Test
[TestFixture]
public class ConnectionFactoryTest
{
-// [Test]
-// [TestCase("mqtt:tcp://${activemqhost}:61613")]
-// [TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
-// [TestCase("stomp:failover:(tcp://${activemqhost}:61616?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
-// [TestCase("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
-// [TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("stomp:tcp://InvalidHost:61613?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("ftp://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("http://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("discovery://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("sms://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Test]
+ [TestCase("tcp://${activemqhost}:1883")]
+// [TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)")]
+// [TestCase("stomp:failover:(tcp://${activemqhost}:1883?keepAlive=false&wireFormat.maxInactivityDuration=1000)?connection.asyncSend=false")]
+// [TestCase("stomp:tcp://${activemqhost}:1883?connection.asyncsend=false")]
+// [TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://${activemqhost}:1883?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:(tcp://${activemqhost}:1883)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://InvalidHost:1883", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("stomp:tcp://InvalidHost:1883?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("ftp://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("http://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("discovery://${activemqhost}:1888", ExpectedException = typeof(NMSConnectionException))]
+// [TestCase("sms://${activemqhost}:1883", ExpectedException = typeof(NMSConnectionException))]
// [TestCase("stomp:multicast://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
-// [TestCase("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)", ExpectedException = typeof(UriFormatException))]
-// [TestCase("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613", ExpectedException = typeof(UriFormatException))]
+// [TestCase("(tcp://${activemqhost}:1883,tcp://${activemqhost}:1883)", ExpectedException = typeof(UriFormatException))]
+// [TestCase("tcp://${activemqhost}:1883,tcp://${activemqhost}:1883", ExpectedException = typeof(UriFormatException))]
public void TestURI(string connectionURI)
{
IConnectionFactory factory = new ConnectionFactory(
@@ -53,6 +53,23 @@ namespace Apache.NMS.MQTT.Test
Assert.IsNotNull(connection);
}
}
+
+ [Test]
+ [TestCase("tcp://${activemqhost}:1883")]
+ public void TestConnectionStarts(string connectionURI)
+ {
+ NMS.Tracer.Trace = new NmsConsoleTracer();
+ IConnectionFactory factory = new ConnectionFactory(
+ NMSTestSupport.ReplaceEnvVar(connectionURI));
+ Assert.IsNotNull(factory);
+ using(IConnection connection = factory.CreateConnection("", ""))
+ {
+ Assert.IsNotNull(connection);
+ // This should trigger a CONNECT frame and CONNACK response.
+ connection.Start();
+ }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt-tests.csproj Wed Nov 27 00:21:59 2013
@@ -6,7 +6,7 @@
<ProductVersion>10.0.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{186FAC43-F3B0-4B03-82DA-EEC0169307B7}</ProjectGuid>
- <OutputType>Exe</OutputType>
+ <OutputType>Library</OutputType>
<RootNamespace>vs2008mqtttests</RootNamespace>
<AssemblyName>vs2008-mqtt-tests</AssemblyName>
</PropertyGroup>
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln?rev=1545889&r1=1545888&r2=1545889&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.sln Wed Nov 27 00:21:59 2013
@@ -21,6 +21,19 @@ Global
{AEBC857B-D693-4833-9F1E-F6A22787D0C9}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
- StartupItem = vs2008-mqtt.csproj
+ StartupItem = vs2008-mqtt-tests.csproj
+ Policies = $0
+ $0.TextStylePolicy = $1
+ $1.inheritsSet = null
+ $1.scope = text/x-csharp
+ $0.CSharpFormattingPolicy = $2
+ $2.inheritsSet = Mono
+ $2.inheritsScope = text/x-csharp
+ $2.scope = text/x-csharp
+ $0.TextStylePolicy = $3
+ $3.FileWidth = 120
+ $3.inheritsSet = VisualStudio
+ $3.inheritsScope = text/plain
+ $3.scope = text/plain
EndGlobalSection
EndGlobal