You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/03 23:04:34 UTC

svn commit: r1564097 - in /activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp: DefaultMessageConverter.cs MessageConsumer.cs

Author: tabish
Date: Mon Feb  3 22:04:34 2014
New Revision: 1564097

URL: http://svn.apache.org/r1564097
Log:
https://issues.apache.org/jira/browse/AMQNET-454

applied:
https://issues.apache.org/jira/secure/attachment/12626743/Apache.NMS.AMQP-fix-replyTo-and-receive-timeouts-16.patch

Modified:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1564097&r1=1564096&r2=1564097&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Feb  3 22:04:34 2014
@@ -137,7 +137,7 @@ namespace Apache.NMS.Amqp
 
         #region Duration Methods
         //
-        private static Duration ToQpidDuration(TimeSpan timespan)
+        public static Duration ToQpidDuration(TimeSpan timespan)
         {
             if (timespan.TotalMilliseconds <= 0)
             {
@@ -157,7 +157,7 @@ namespace Apache.NMS.Amqp
         }
 
         //
-        private static TimeSpan ToNMSTimespan(Duration duration)
+        public static TimeSpan ToNMSTimespan(Duration duration)
         {
             if (duration.Milliseconds > Int64.MaxValue)
             {

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1564097&r1=1564096&r2=1564097&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Feb  3 22:04:34 2014
@@ -37,6 +37,7 @@ namespace Apache.NMS.Amqp
         private readonly Session session;
         private readonly int id;
         private readonly Destination destination;
+        private Destination replyToDestination;
         private readonly AcknowledgementMode acknowledgementMode;
         private event MessageListener listener;
         private int listenerCount = 0;
@@ -65,7 +66,7 @@ namespace Apache.NMS.Amqp
         #region IStartable Methods
         public void Start()
         {
-            // Don't try creating session if connection not yet up
+            // Don't try creating receiver if session not yet up
             if (!session.IsStarted)
             {
                 throw new SessionClosedException();
@@ -75,11 +76,24 @@ namespace Apache.NMS.Amqp
             {
                 try
                 {
-                    // Create qpid sender
+                    // Create qpid receiver
                     Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString());
                     if (qpidReceiver == null)
                     {
                         qpidReceiver = session.CreateQpidReceiver(destination.Address);
+                        // Recover replyTo address from qpid receiver and set as the
+                        // replyTo destination for received messages.
+                        Address replyTo = qpidReceiver.GetAddress();
+                        if (destination.IsQueue)
+                        {
+                            Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options);
+                            replyToDestination = (Destination)queue;
+                        }
+                        else if (destination.IsTopic)
+                        {
+                            Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options);
+                            replyToDestination = (Destination)topic;
+                        }
                     }
                 }
                 catch (Org.Apache.Qpid.Messaging.QpidException e)
@@ -137,36 +151,56 @@ namespace Apache.NMS.Amqp
             }
         }
 
+
+        /// <summary>
+        /// Fetch a message from Qpid Receiver.
+        /// Will wait FOREVER.
+        /// </summary>
+        /// <returns>NMS message or null if Fetch fails</returns>
         public IMessage Receive()
         {
-            IMessage nmsMessage = null;
-
-            Message qpidMessage = new Message();
-            if (qpidReceiver.Fetch(ref qpidMessage))
-            {
-                nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
-            }
-            return nmsMessage;
+            return ReceiveQpid(DurationConstants.FORVER);
         }
 
+
+        /// <summary>
+        /// Fetch a message from Qpid Receiver
+        /// Will wait for given timespan before abandoning the Fetch.
+        /// </summary>
+        /// <param name="timeout"></param>
+        /// <returns>>NMS message or null if Fetch fails or times out</returns>
         public IMessage Receive(TimeSpan timeout)
         {
-            IMessage nmsMessage = null;
+            return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout));
+        }
 
-            // TODO: Receive a message
 
-            return nmsMessage;
+        /// <summary>
+        /// Fetch a message from Qpid Receiver
+        /// Returns from the Fetch immediately.
+        /// </summary>
+        /// <returns>NMS message or null if none was pending</returns>
+        public IMessage ReceiveNoWait()
+        {
+            return ReceiveQpid(DurationConstants.IMMEDIATE);
         }
 
-        public IMessage ReceiveNoWait()
+
+
+        private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout)
         {
             IMessage nmsMessage = null;
 
-            // TODO: Receive a message
-
+            Message qpidMessage = new Message();
+            if (qpidReceiver.Fetch(ref qpidMessage, timeout))
+            {
+                nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
+                nmsMessage.NMSReplyTo = replyToDestination;
+            }
             return nmsMessage;
         }
 
+        
         public void Dispose()
         {
             Close();
@@ -202,7 +236,7 @@ namespace Apache.NMS.Amqp
             if(asyncDelivery.CompareAndSet(false, true))
             {
                 asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-                asyncDeliveryThread.Name = "Message Consumer Dispatch: " + "TODO: unique name";
+                asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString();
                 asyncDeliveryThread.IsBackground = true;
                 asyncDeliveryThread.Start();
             }