You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2009/05/28 07:19:08 UTC
svn commit: r779436 - in /activemq/activemq-dotnet:
Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS/trunk/src/test/csharp/
Author: jgomes
Date: Thu May 28 05:19:07 2009
New Revision: 779436
URL: http://svn.apache.org/viewvc?rev=779436&view=rev
Log:
Fixed locking of 'this' to use separate locking object.
Fixed connection terminated logic to protect checking with locking objects. There was a race condition between the time of the connection check and then the actual use of that connection.
Fixed acknowledgement logic for AcknowledgementMode.DupsOkAcknowledge.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs Thu May 28 05:19:07 2009
@@ -30,7 +30,6 @@
readonly Object semaphore = new Object();
readonly ArrayList messagesToRedeliver = new ArrayList();
- // TODO can't use EventWaitHandle on MONO 1.0
AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
bool m_bAsyncDelivery = false;
bool m_bClosed = false;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs Thu May 28 05:19:07 2009
@@ -27,6 +27,7 @@
public delegate void ExceptionHandler(Exception exception);
private readonly AutoResetEvent m_event = new AutoResetEvent(false);
+ private object initobjectLock = new object();
private bool m_bStopFlag = false;
private Thread m_thread = null;
private readonly DispatchFunction m_dispatchFunc;
@@ -37,7 +38,6 @@
m_dispatchFunc = dispatchFunc;
}
- // TODO can't use EventWaitHandle on MONO 1.0
public AutoResetEvent EventHandle
{
get { return m_event; }
@@ -51,7 +51,7 @@
internal void Start()
{
- lock(this)
+ lock(initobjectLock)
{
if(m_thread == null)
{
@@ -74,7 +74,7 @@
{
Tracer.Info("Stopping dispatcher thread for session");
Thread localThread = null;
- lock(this)
+ lock(initobjectLock)
{
localThread = m_thread;
m_thread = null;
@@ -104,7 +104,7 @@
{
while(true) // loop forever (well, at least until we've been asked to stop)
{
- lock(this)
+ lock(initobjectLock)
{
if(m_bStopFlag)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu May 28 05:19:07 2009
@@ -35,7 +35,8 @@
public class MessageConsumer : IMessageConsumer
{
private readonly AcknowledgementMode acknowledgementMode;
- private AtomicBoolean closed = new AtomicBoolean( false );
+ private bool closed = false;
+ private object closedLock = new object();
private readonly Dispatcher dispatcher = new Dispatcher();
private readonly ConsumerInfo info;
private int maximumRedeliveryCount = 10;
@@ -53,7 +54,7 @@
this.acknowledgementMode = acknowledgementMode;
if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode)
{
- this.ackSession = (Session) session.Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ this.ackSession = (Session) session.Connection.CreateSession(acknowledgementMode);
}
}
@@ -99,21 +100,18 @@
public IMessage Receive()
{
- CheckClosed();
SendPullRequest(0);
return SetupAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(System.TimeSpan timeout)
{
- CheckClosed();
SendPullRequest((long) timeout.TotalMilliseconds);
return SetupAcknowledge(dispatcher.Dequeue(timeout));
}
public IMessage ReceiveNoWait()
{
- CheckClosed();
SendPullRequest(-1);
return SetupAcknowledge(dispatcher.DequeueNoWait());
}
@@ -150,9 +148,9 @@
public void Close()
{
- lock(this)
+ lock(closedLock)
{
- if(closed.Value)
+ if(closed)
{
return;
}
@@ -175,7 +173,7 @@
session = null;
ackSession = null;
- closed.Value = true;
+ closed = true;
}
}
@@ -194,13 +192,19 @@
/// <param name="message">An ActiveMQMessage</param>
public void Dispatch(ActiveMQMessage message)
{
- lock(this)
+ if(AcknowledgementMode.AutoAcknowledge == this.acknowledgementMode)
{
- if(ackSession != null)
+ MessageAck ack = CreateMessageAck(message);
+ Tracer.Debug("Sending AutoAck: " + ack);
+ message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+
+ lock(closedLock)
{
- message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
- MessageAck ack = CreateMessageAck(message);
- Tracer.Debug("Sending AutoAck: " + ack);
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
ackSession.Connection.OneWay(ack);
}
}
@@ -227,14 +231,6 @@
}
}
- protected void CheckClosed()
- {
- if(closed.Value)
- {
- throw new ConnectionClosedException();
- }
- }
-
protected IMessage SetupAcknowledge(IMessage message)
{
if(null == message)
@@ -244,7 +240,7 @@
if(message is ActiveMQMessage)
{
- ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ ActiveMQMessage activeMessage = (ActiveMQMessage) message;
if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode)
{
@@ -259,20 +255,26 @@
return message;
}
-
- protected void SendPullRequest( long timeout )
- {
- CheckClosed();
+ protected void SendPullRequest(long timeout)
+ {
if(this.info.PrefetchSize == 0 && this.dispatcher.isEmpty())
{
MessagePull messagePull = new MessagePull();
messagePull.ConsumerId = this.info.ConsumerId;
- messagePull.Destination = this.info.Destination;
- messagePull.Timeout = timeout;
+ messagePull.Destination = this.info.Destination;
+ messagePull.Timeout = timeout;
Tracer.Debug("Sending MessagePull: " + messagePull);
- session.Connection.OneWay(messagePull);
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ session.Connection.OneWay(messagePull);
+ }
}
}
@@ -284,7 +286,15 @@
{
MessageAck ack = CreateMessageAck(message);
Tracer.Debug("Sending Ack: " + ack);
- session.Connection.OneWay(ack);
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ session.Connection.OneWay(ack);
+ }
}
protected virtual MessageAck CreateMessageAck(Message message)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Thu May 28 05:19:07 2009
@@ -28,6 +28,7 @@
{
private Session session;
private bool closed = false;
+ private object closedLock = new object();
private readonly ProducerInfo info;
private int messageCounter = 0;
@@ -83,7 +84,7 @@
public void Close()
{
- lock(this)
+ lock(closedLock)
{
if(closed)
{
@@ -104,17 +105,6 @@
}
}
- protected void CheckClosed()
- {
- lock(this)
- {
- if(closed)
- {
- throw new ConnectionClosedException();
- }
- }
- }
-
public void Send(IMessage message)
{
Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
@@ -150,7 +140,6 @@
throw new Apache.NMS.InvalidDestinationException();
}
- CheckClosed();
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
if(!disableMessageID)
@@ -166,12 +155,6 @@
activeMessage.NMSDeliveryMode = deliveryMode;
activeMessage.NMSPriority = priority;
- if(session.Transacted)
- {
- session.DoStartTransaction();
- activeMessage.TransactionId = session.TransactionContext.TransactionId;
- }
-
if(!disableMessageTimestamp)
{
activeMessage.NMSTimestamp = DateTime.UtcNow;
@@ -182,7 +165,21 @@
activeMessage.NMSTimeToLive = timeToLive;
}
- session.DoSend(activeMessage, this.RequestTimeout);
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(session.Transacted)
+ {
+ session.DoStartTransaction();
+ activeMessage.TransactionId = session.TransactionContext.TransactionId;
+ }
+
+ session.DoSend(activeMessage, this.RequestTimeout);
+ }
}
public MsgDeliveryMode DeliveryMode
@@ -223,43 +220,36 @@
public IMessage CreateMessage()
{
- CheckClosed();
return session.CreateMessage();
}
public ITextMessage CreateTextMessage()
{
- CheckClosed();
return session.CreateTextMessage();
}
public ITextMessage CreateTextMessage(string text)
{
- CheckClosed();
return session.CreateTextMessage(text);
}
public IMapMessage CreateMapMessage()
{
- CheckClosed();
return session.CreateMapMessage();
}
public IObjectMessage CreateObjectMessage(object body)
{
- CheckClosed();
return session.CreateObjectMessage(body);
}
public IBytesMessage CreateBytesMessage()
{
- CheckClosed();
return session.CreateBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
- CheckClosed();
return session.CreateBytesMessage(body);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs Thu May 28 05:19:07 2009
@@ -67,10 +67,12 @@
{
IMessage msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive first durable message.");
- SendPersistentMessage();
+ msg.Acknowledge();
+ SendPersistentMessage();
msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive second durable message.");
+ msg.Acknowledge();
}
}
}
@@ -110,10 +112,12 @@
{
IMessage msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive first durable transactional message.");
+ msg.Acknowledge();
SendPersistentMessage();
msg = consumer.Receive(receiveTimeout);
Assert.IsNotNull(msg, "Did not receive second durable transactional message.");
+ msg.Acknowledge();
session.Commit();
}
}
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs?rev=779436&r1=779435&r2=779436&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs Thu May 28 05:19:07 2009
@@ -252,6 +252,7 @@
Assert.IsNotNull(destinationTopic, "Could not get destination topic.");
using(IMessageConsumer consumer = session.CreateDurableConsumer(destinationTopic, consumerID, selector, noLocal))
{
+ Assert.IsNotNull(consumer, "Could not create durable consumer.");
}
}
}