You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2009/05/28 07:19:08 UTC

svn commit: r779436 - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS/trunk/src/test/csharp/

Author: jgomes
Date: Thu May 28 05:19:07 2009
New Revision: 779436

URL: http://svn.apache.org/viewvc?rev=779436&view=rev
Log:
Fixed locking of 'this' to use separate locking object.
Fixed connection terminated logic to protect checking with locking objects.  There was a race condition between the time of the connection check and then the actual use of that connection.
Fixed acknowledgement logic for AcknowledgementMode.DupsOkAcknowledge.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs Thu May 28 05:19:07 2009
@@ -30,7 +30,6 @@
 		readonly Object semaphore = new Object();
 		readonly ArrayList messagesToRedeliver = new ArrayList();
         
-        // TODO can't use EventWaitHandle on MONO 1.0
         AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
         bool m_bAsyncDelivery = false;
         bool m_bClosed = false;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs Thu May 28 05:19:07 2009
@@ -27,6 +27,7 @@
 		public delegate void ExceptionHandler(Exception exception);
 
 		private readonly AutoResetEvent m_event = new AutoResetEvent(false);
+		private object initobjectLock = new object();
 		private bool m_bStopFlag = false;
 		private Thread m_thread = null;
 		private readonly DispatchFunction m_dispatchFunc;
@@ -37,7 +38,6 @@
 			m_dispatchFunc = dispatchFunc;
 		}
 
-		// TODO can't use EventWaitHandle on MONO 1.0
 		public AutoResetEvent EventHandle
 		{
 			get { return m_event; }
@@ -51,7 +51,7 @@
 
 		internal void Start()
 		{
-			lock(this)
+			lock(initobjectLock)
 			{
 				if(m_thread == null)
 				{
@@ -74,7 +74,7 @@
 		{
 			Tracer.Info("Stopping dispatcher thread for session");
 			Thread localThread = null;
-			lock(this)
+			lock(initobjectLock)
 			{
 				localThread = m_thread;
 				m_thread = null;
@@ -104,7 +104,7 @@
 			{
 				while(true) // loop forever (well, at least until we've been asked to stop)
 				{
-					lock(this)
+					lock(initobjectLock)
 					{
 						if(m_bStopFlag)
 						{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu May 28 05:19:07 2009
@@ -35,7 +35,8 @@
 	public class MessageConsumer : IMessageConsumer
 	{
 		private readonly AcknowledgementMode acknowledgementMode;
-		private AtomicBoolean closed = new AtomicBoolean( false );
+		private bool closed = false;
+		private object closedLock = new object();
 		private readonly Dispatcher dispatcher = new Dispatcher();
 		private readonly ConsumerInfo info;
 		private int maximumRedeliveryCount = 10;
@@ -53,7 +54,7 @@
 			this.acknowledgementMode = acknowledgementMode;
 			if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode)
 			{
-				this.ackSession = (Session) session.Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+				this.ackSession = (Session) session.Connection.CreateSession(acknowledgementMode);
 			}
 		}
 
@@ -99,21 +100,18 @@
 
 		public IMessage Receive()
 		{
-			CheckClosed();
 			SendPullRequest(0);
 			return SetupAcknowledge(dispatcher.Dequeue());
 		}
 
 		public IMessage Receive(System.TimeSpan timeout)
 		{
-			CheckClosed();
 			SendPullRequest((long) timeout.TotalMilliseconds);
 			return SetupAcknowledge(dispatcher.Dequeue(timeout));
 		}
 
 		public IMessage ReceiveNoWait()
 		{
-			CheckClosed();
 			SendPullRequest(-1);
 			return SetupAcknowledge(dispatcher.DequeueNoWait());
 		}
@@ -150,9 +148,9 @@
 
 		public void Close()
 		{
-			lock(this)
+			lock(closedLock)
 			{
-				if(closed.Value)
+				if(closed)
 				{
 					return;
 				}
@@ -175,7 +173,7 @@
 
 				session = null;
 				ackSession = null;
-				closed.Value = true;
+				closed = true;
 			}
 		}
 
@@ -194,13 +192,19 @@
 		/// <param name="message">An ActiveMQMessage</param>
 		public void Dispatch(ActiveMQMessage message)
 		{
-			lock(this)
+			if(AcknowledgementMode.AutoAcknowledge == this.acknowledgementMode)
 			{
-				if(ackSession != null)
+				MessageAck ack = CreateMessageAck(message);
+				Tracer.Debug("Sending AutoAck: " + ack);
+				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+
+				lock(closedLock)
 				{
-					message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-					MessageAck ack = CreateMessageAck(message);
-					Tracer.Debug("Sending AutoAck: " + ack);
+					if(closed)
+					{
+						throw new ConnectionClosedException();
+					}
+
 					ackSession.Connection.OneWay(ack);
 				}
 			}
@@ -227,14 +231,6 @@
 			}
 		}
 
-		protected void CheckClosed()
-		{
-			if(closed.Value)
-			{
-				throw new ConnectionClosedException();
-			}
-		}
-
 		protected IMessage SetupAcknowledge(IMessage message)
 		{
 			if(null == message)
@@ -244,7 +240,7 @@
 
 			if(message is ActiveMQMessage)
 			{
-				ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+				ActiveMQMessage activeMessage = (ActiveMQMessage) message;
 
 				if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode)
 				{
@@ -259,20 +255,26 @@
 
 			return message;
 		}
-		
-		protected void SendPullRequest( long timeout ) 
-		{
-            CheckClosed();
 
+		protected void SendPullRequest(long timeout)
+		{
 			if(this.info.PrefetchSize == 0 && this.dispatcher.isEmpty())
 			{
 				MessagePull messagePull = new MessagePull();
 				messagePull.ConsumerId = this.info.ConsumerId;
-                messagePull.Destination = this.info.Destination;
-                messagePull.Timeout = timeout;
+				messagePull.Destination = this.info.Destination;
+				messagePull.Timeout = timeout;
 
 				Tracer.Debug("Sending MessagePull: " + messagePull);
-				session.Connection.OneWay(messagePull);
+				lock(closedLock)
+				{
+					if(closed)
+					{
+						throw new ConnectionClosedException();
+					}
+
+					session.Connection.OneWay(messagePull);
+				}
 			}
 		}
 
@@ -284,7 +286,15 @@
 		{
 			MessageAck ack = CreateMessageAck(message);
 			Tracer.Debug("Sending Ack: " + ack);
-			session.Connection.OneWay(ack);
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					throw new ConnectionClosedException();
+				}
+
+				session.Connection.OneWay(ack);
+			}
 		}
 
 		protected virtual MessageAck CreateMessageAck(Message message)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Thu May 28 05:19:07 2009
@@ -28,6 +28,7 @@
 	{
 		private Session session;
 		private bool closed = false;
+        private object closedLock = new object();
 		private readonly ProducerInfo info;
 		private int messageCounter = 0;
 
@@ -83,7 +84,7 @@
 
 		public void Close()
 		{
-			lock(this)
+			lock(closedLock)
 			{
 				if(closed)
 				{
@@ -104,17 +105,6 @@
 			}
 		}
 
-		protected void CheckClosed()
-		{
-			lock(this)
-			{
-				if(closed)
-				{
-					throw new ConnectionClosedException();
-				}
-			}
-		}
-
 		public void Send(IMessage message)
 		{
 			Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
@@ -150,7 +140,6 @@
 				throw new Apache.NMS.InvalidDestinationException();
 			}
 
-			CheckClosed();
 			ActiveMQMessage activeMessage = (ActiveMQMessage) message;
 
 			if(!disableMessageID)
@@ -166,12 +155,6 @@
 			activeMessage.NMSDeliveryMode = deliveryMode;
 			activeMessage.NMSPriority = priority;
 
-			if(session.Transacted)
-			{
-				session.DoStartTransaction();
-				activeMessage.TransactionId = session.TransactionContext.TransactionId;
-			}
-
 			if(!disableMessageTimestamp)
 			{
 				activeMessage.NMSTimestamp = DateTime.UtcNow;
@@ -182,7 +165,21 @@
 				activeMessage.NMSTimeToLive = timeToLive;
 			}
 
-			session.DoSend(activeMessage, this.RequestTimeout);
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					throw new ConnectionClosedException();
+				}
+
+				if(session.Transacted)
+				{
+					session.DoStartTransaction();
+					activeMessage.TransactionId = session.TransactionContext.TransactionId;
+				}
+
+				session.DoSend(activeMessage, this.RequestTimeout);
+			}
 		}
 
 		public MsgDeliveryMode DeliveryMode
@@ -223,43 +220,36 @@
 
 		public IMessage CreateMessage()
 		{
-			CheckClosed();
 			return session.CreateMessage();
 		}
 
 		public ITextMessage CreateTextMessage()
 		{
-			CheckClosed();
 			return session.CreateTextMessage();
 		}
 
 		public ITextMessage CreateTextMessage(string text)
 		{
-			CheckClosed();
 			return session.CreateTextMessage(text);
 		}
 
 		public IMapMessage CreateMapMessage()
 		{
-			CheckClosed();
 			return session.CreateMapMessage();
 		}
 
 		public IObjectMessage CreateObjectMessage(object body)
 		{
-			CheckClosed();
 			return session.CreateObjectMessage(body);
 		}
 
 		public IBytesMessage CreateBytesMessage()
 		{
-			CheckClosed();
 			return session.CreateBytesMessage();
 		}
 
 		public IBytesMessage CreateBytesMessage(byte[] body)
 		{
-			CheckClosed();
 			return session.CreateBytesMessage(body);
 		}
 	}

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs Thu May 28 05:19:07 2009
@@ -67,10 +67,12 @@
 						{
 							IMessage msg = consumer.Receive(receiveTimeout);
 							Assert.IsNotNull(msg, "Did not receive first durable message.");
-							SendPersistentMessage();
+							msg.Acknowledge();
 
+							SendPersistentMessage();
 							msg = consumer.Receive(receiveTimeout);
 							Assert.IsNotNull(msg, "Did not receive second durable message.");
+							msg.Acknowledge();
 						}
 					}
 				}
@@ -110,10 +112,12 @@
 					{
 						IMessage msg = consumer.Receive(receiveTimeout);
 						Assert.IsNotNull(msg, "Did not receive first durable transactional message.");
+						msg.Acknowledge();
 						SendPersistentMessage();
 
 						msg = consumer.Receive(receiveTimeout);
 						Assert.IsNotNull(msg, "Did not receive second durable transactional message.");
+						msg.Acknowledge();
 						session.Commit();
 					}
 				}

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs Thu May 28 05:19:07 2009
@@ -252,6 +252,7 @@
 					Assert.IsNotNull(destinationTopic, "Could not get destination topic.");
 					using(IMessageConsumer consumer = session.CreateDurableConsumer(destinationTopic, consumerID, selector, noLocal))
 					{
+						Assert.IsNotNull(consumer, "Could not create durable consumer.");
 					}
 				}
 			}