You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/03 23:04:34 UTC
svn commit: r1564097 - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp:
DefaultMessageConverter.cs MessageConsumer.cs
Author: tabish
Date: Mon Feb 3 22:04:34 2014
New Revision: 1564097
URL: http://svn.apache.org/r1564097
Log:
https://issues.apache.org/jira/browse/AMQNET-454
applied:
https://issues.apache.org/jira/secure/attachment/12626743/Apache.NMS.AMQP-fix-replyTo-and-receive-timeouts-16.patch
Modified:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1564097&r1=1564096&r2=1564097&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Feb 3 22:04:34 2014
@@ -137,7 +137,7 @@ namespace Apache.NMS.Amqp
#region Duration Methods
//
- private static Duration ToQpidDuration(TimeSpan timespan)
+ public static Duration ToQpidDuration(TimeSpan timespan)
{
if (timespan.TotalMilliseconds <= 0)
{
@@ -157,7 +157,7 @@ namespace Apache.NMS.Amqp
}
//
- private static TimeSpan ToNMSTimespan(Duration duration)
+ public static TimeSpan ToNMSTimespan(Duration duration)
{
if (duration.Milliseconds > Int64.MaxValue)
{
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1564097&r1=1564096&r2=1564097&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Feb 3 22:04:34 2014
@@ -37,6 +37,7 @@ namespace Apache.NMS.Amqp
private readonly Session session;
private readonly int id;
private readonly Destination destination;
+ private Destination replyToDestination;
private readonly AcknowledgementMode acknowledgementMode;
private event MessageListener listener;
private int listenerCount = 0;
@@ -65,7 +66,7 @@ namespace Apache.NMS.Amqp
#region IStartable Methods
public void Start()
{
- // Don't try creating session if connection not yet up
+ // Don't try creating receiver if session not yet up
if (!session.IsStarted)
{
throw new SessionClosedException();
@@ -75,11 +76,24 @@ namespace Apache.NMS.Amqp
{
try
{
- // Create qpid sender
+ // Create qpid receiver
Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString());
if (qpidReceiver == null)
{
qpidReceiver = session.CreateQpidReceiver(destination.Address);
+ // Recover replyTo address from qpid receiver and set as the
+ // replyTo destination for received messages.
+ Address replyTo = qpidReceiver.GetAddress();
+ if (destination.IsQueue)
+ {
+ Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options);
+ replyToDestination = (Destination)queue;
+ }
+ else if (destination.IsTopic)
+ {
+ Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options);
+ replyToDestination = (Destination)topic;
+ }
}
}
catch (Org.Apache.Qpid.Messaging.QpidException e)
@@ -137,36 +151,56 @@ namespace Apache.NMS.Amqp
}
}
+
+ /// <summary>
+ /// Fetch a message from Qpid Receiver.
+ /// Will wait FOREVER.
+ /// </summary>
+ /// <returns>NMS message or null if Fetch fails</returns>
public IMessage Receive()
{
- IMessage nmsMessage = null;
-
- Message qpidMessage = new Message();
- if (qpidReceiver.Fetch(ref qpidMessage))
- {
- nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
- }
- return nmsMessage;
+ return ReceiveQpid(DurationConstants.FORVER);
}
+
+ /// <summary>
+ /// Fetch a message from Qpid Receiver
+ /// Will wait for given timespan before abandoning the Fetch.
+ /// </summary>
+ /// <param name="timeout"></param>
+ /// <returns>>NMS message or null if Fetch fails or times out</returns>
public IMessage Receive(TimeSpan timeout)
{
- IMessage nmsMessage = null;
+ return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout));
+ }
- // TODO: Receive a message
- return nmsMessage;
+ /// <summary>
+ /// Fetch a message from Qpid Receiver
+ /// Returns from the Fetch immediately.
+ /// </summary>
+ /// <returns>NMS message or null if none was pending</returns>
+ public IMessage ReceiveNoWait()
+ {
+ return ReceiveQpid(DurationConstants.IMMEDIATE);
}
- public IMessage ReceiveNoWait()
+
+
+ private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout)
{
IMessage nmsMessage = null;
- // TODO: Receive a message
-
+ Message qpidMessage = new Message();
+ if (qpidReceiver.Fetch(ref qpidMessage, timeout))
+ {
+ nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
+ nmsMessage.NMSReplyTo = replyToDestination;
+ }
return nmsMessage;
}
+
public void Dispose()
{
Close();
@@ -202,7 +236,7 @@ namespace Apache.NMS.Amqp
if(asyncDelivery.CompareAndSet(false, true))
{
asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
- asyncDeliveryThread.Name = "Message Consumer Dispatch: " + "TODO: unique name";
+ asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString();
asyncDeliveryThread.IsBackground = true;
asyncDeliveryThread.Start();
}