You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2014/10/09 03:38:53 UTC
svn commit: r1630267 - in
/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp:
BaseMessage.cs Destination.cs MessageConsumer.cs MessageProducer.cs
TemporaryQueue.cs TemporaryTopic.cs
Author: jgomes
Date: Thu Oct 9 01:38:53 2014
New Revision: 1630267
URL: http://svn.apache.org/r1630267
Log:
Add support for serializing/deserializing BytesMessages.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)
Modified:
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs Thu Oct 9 01:38:53 2014
@@ -36,12 +36,19 @@ namespace Apache.NMS.ZMQ
private string type;
private event AcknowledgeHandler Acknowledger;
private DateTime timestamp = new DateTime();
+ private bool readOnlyMsgProperties = false;
private bool readOnlyMsgBody = false;
- public bool ReadOnlyBody
+ public virtual bool ReadOnlyProperties
{
- get { return readOnlyMsgBody; }
- set { readOnlyMsgBody = value; }
+ get { return this.readOnlyMsgProperties; }
+ set { this.readOnlyMsgProperties = value; }
+ }
+
+ public virtual bool ReadOnlyBody
+ {
+ get { return this.readOnlyMsgBody; }
+ set { this.readOnlyMsgBody = value; }
}
// IMessage interface
@@ -155,7 +162,6 @@ namespace Apache.NMS.ZMQ
set { }
}
-
/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
@@ -190,7 +196,6 @@ namespace Apache.NMS.ZMQ
set { type = value; }
}
-
public object GetObjectProperty(string name)
{
return null;
@@ -200,9 +205,15 @@ namespace Apache.NMS.ZMQ
{
}
+ public virtual void OnSend()
+ {
+ this.ReadOnlyProperties = true;
+ this.ReadOnlyBody = true;
+ }
+
protected void FailIfReadOnlyBody()
{
- if(ReadOnlyBody == true)
+ if(ReadOnlyBody)
{
throw new MessageNotWriteableException("Message is in Read-Only mode.");
}
@@ -210,7 +221,7 @@ namespace Apache.NMS.ZMQ
protected void FailIfWriteOnlyBody()
{
- if(ReadOnlyBody == false)
+ if(!ReadOnlyBody)
{
throw new MessageNotReadableException("Message is in Write-Only mode.");
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Thu Oct 9 01:38:53 2014
@@ -36,6 +36,7 @@ namespace Apache.NMS.ZMQ
protected ZmqSocket producerEndpoint = null;
protected ZmqSocket consumerEndpoint = null;
protected string destinationName;
+ internal byte[] rawDestinationName;
private bool disposed = false;
@@ -47,6 +48,7 @@ namespace Apache.NMS.ZMQ
{
this.session = session;
this.destinationName = destName;
+ this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName);
this.session.RegisterDestination(this);
}
@@ -88,23 +90,8 @@ namespace Apache.NMS.ZMQ
/// </summary>
protected virtual void OnDispose()
{
- if(null != this.producerEndpoint)
- {
- if(null != this.session
- && null != this.session.Connection)
- {
- this.session.Connection.ReleaseProducer(this.producerEndpoint);
- }
-
- this.producerEndpoint = null;
- }
-
- if(null != this.consumerEndpoint)
- {
- this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
- this.consumerEndpoint = null;
- }
-
+ DeinitSender();
+ DeinitReceiver();
this.session.UnregisterDestination(this);
}
@@ -190,6 +177,20 @@ namespace Apache.NMS.ZMQ
}
}
+ internal void DeinitSender()
+ {
+ if(null != this.producerEndpoint)
+ {
+ if(null != this.session
+ && null != this.session.Connection)
+ {
+ this.session.Connection.ReleaseProducer(this.producerEndpoint);
+ }
+
+ this.producerEndpoint = null;
+ }
+ }
+
internal void InitReceiver()
{
if(null == this.consumerEndpoint)
@@ -198,34 +199,27 @@ namespace Apache.NMS.ZMQ
this.consumerEndpoint = connection.GetConsumer();
// Must subscribe first before connecting to the endpoint binding
- this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+ this.consumerEndpoint.Subscribe(this.rawDestinationName);
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
}
}
- internal void Subscribe(string prefixName)
- {
- InitReceiver();
- this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
- }
-
- internal void Unsubscribe(string prefixName)
+ internal void DeinitReceiver()
{
if(null != this.consumerEndpoint)
{
- this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+ this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+ this.consumerEndpoint = null;
}
}
internal SendStatus Send(string msg)
{
- Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(msg, Destination.encoding);
}
internal SendStatus Send(byte[] buffer)
{
- Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(buffer);
}
@@ -246,20 +240,6 @@ namespace Apache.NMS.ZMQ
this.InitReceiver();
return this.consumerEndpoint.Receive(null, flags, out size);
}
-
- internal Frame ReceiveFrame()
- {
- // TODO: Implement
- this.InitReceiver();
- return null;
- }
-
- internal ZmqMessage ReceiveMessage()
- {
- // TODO: Implement
- this.InitReceiver();
- return null;
- }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Oct 9 01:38:53 2014
@@ -40,7 +40,6 @@ namespace Apache.NMS.ZMQ
private object asyncDeliveryLock = new object();
private bool asyncDelivery = false;
private bool asyncInit = false;
- private byte[] rawDestinationName;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -82,7 +81,6 @@ namespace Apache.NMS.ZMQ
this.session = sess;
this.destination = theDest;
- this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}
@@ -145,7 +143,7 @@ namespace Apache.NMS.ZMQ
if(size > 0)
{
// Strip off the subscribed destination name.
- int receivedMsgIndex = this.rawDestinationName.Length;
+ int receivedMsgIndex = this.destination.rawDestinationName.Length;
int msgLength = receivedMsg.Length - receivedMsgIndex;
byte[] msgContent = new byte[msgLength];
@@ -406,6 +404,14 @@ namespace Apache.NMS.ZMQ
}
break;
+ case WireFormat.MT_BYTESMESSAGE:
+ nmsMessage = new BytesMessage();
+ if(null != messageBody)
+ {
+ ((BytesMessage) nmsMessage).Content = messageBody;
+ }
+ break;
+
case WireFormat.MT_UNKNOWN:
default:
break;
@@ -444,6 +450,9 @@ namespace Apache.NMS.ZMQ
nmsMessage = transformedMessage as BaseMessage;
}
}
+
+ nmsMessage.ReadOnlyBody = true;
+ nmsMessage.ReadOnlyProperties = true;
}
return nmsMessage;
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Thu Oct 9 01:38:53 2014
@@ -19,8 +19,8 @@
using System;
using System.Collections.Generic;
-using System.Text;
using System.Net;
+using System.Text;
using Apache.NMS.Util;
namespace Apache.NMS.ZMQ
@@ -48,13 +48,35 @@ namespace Apache.NMS.ZMQ
public MessageProducer(Session sess, IDestination dest)
{
- if(null == sess.Connection.Context)
+ if(null == sess
+ || null == sess.Connection
+ || null == sess.Connection.Context)
{
throw new NMSConnectionException();
}
+ Destination theDest = dest as Destination;
+
+ if(null == theDest)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+ else if(null == theDest.Name)
+ {
+ throw new InvalidDestinationException("The destination object was not given a physical name.");
+ }
+ else if(theDest.IsTemporary)
+ {
+ String physicalName = theDest.Name;
+
+ if(String.IsNullOrEmpty(physicalName))
+ {
+ throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);
+ }
+ }
+
this.session = sess;
- this.destination = (Destination) dest;
+ this.destination = theDest;
this.destination.InitSender();
}
@@ -150,6 +172,17 @@ namespace Apache.NMS.ZMQ
EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
}
}
+ else if(message is IBytesMessage)
+ {
+ EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);
+ // Append the message text body to the msg.
+ byte[] msgBody = ((IBytesMessage) message).Content;
+
+ if(null != msgBody)
+ {
+ EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
+ }
+ }
else
{
// TODO: Add support for more message types
@@ -158,6 +191,8 @@ namespace Apache.NMS.ZMQ
// Put the sentinal field marker.
EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+
+ ((BaseMessage) message).OnSend();
theDest.Send(msgDataBuilder.ToArray());
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs Thu Oct 9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
public class TemporaryQueue : Destination, ITemporaryQueue
{
public TemporaryQueue(Session session)
- : base(session, Guid.NewGuid().ToString())
+ : base(session, "TEMPQUEUE." + Guid.NewGuid().ToString())
{
}
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Thu Oct 9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
public class TemporaryTopic : Destination, ITemporaryTopic
{
public TemporaryTopic(Session session)
- : base(session, Guid.NewGuid().ToString())
+ : base(session, "TEMPTOPIC." + Guid.NewGuid().ToString())
{
}