You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/27 19:24:36 UTC
svn commit: r468465 - in
/incubator/activemq/activemq-dotnet/trunk/src/main/csharp: ActiveMQ/
ActiveMQ/Util/ NMS/
Author: jstrachan
Date: Fri Oct 27 10:24:35 2006
New Revision: 468465
URL: http://svn.apache.org/viewvc?view=rev&rev=468465
Log:
Applied patch from Rob Lugt for AMQ-999 to fix the threading of the ActiveMQ.Net client. Many thanks Rob!
Added:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs
Modified:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Fri Oct 27 10:24:35 2006
@@ -40,6 +40,7 @@
private long temporaryDestinationCounter;
private long localTransactionCounter;
private bool closing;
+ private Util.AtomicBoolean started = new ActiveMQ.Util.AtomicBoolean(true);
public Connection(ITransport transport, ConnectionInfo info)
{
@@ -51,21 +52,44 @@
}
public event ExceptionListener ExceptionListener;
-
- /// <summary>
- /// Starts message delivery for this connection.
- /// </summary>
- public void Start()
- {
- }
-
-
- /// <summary>
- /// Stop message delivery for this connection.
- /// </summary>
- public void Stop()
- {
- }
+
+
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+
+ /// <summary>
+ /// Starts asynchronous message delivery of incoming messages for this connection.
+ /// Synchronous delivery is unaffected.
+ /// </summary>
+ public void Start()
+ {
+ CheckConnected();
+ if (started.compareAndSet(false, true))
+ {
+ foreach(Session session in sessions)
+ {
+ session.StartAsyncDelivery(null);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+ /// The sending of outbound messages is unaffected.
+ /// </summary>
+ public void Stop()
+ {
+ CheckConnected();
+ if (started.compareAndSet(true, false))
+ {
+ foreach(Session session in sessions)
+ {
+ session.StopAsyncDelivery();
+ }
+ }
+ }
/// <summary>
/// Creates a new session to work on this connection
@@ -86,21 +110,38 @@
sessions.Add(session);
return session;
}
-
+
+ public void Close()
+ {
+ if (!closed)
+ {
+ closing = true;
+ foreach (Session session in sessions)
+ {
+ session.Close();
+ }
+ sessions.Clear();
+ try
+ {
+ DisposeOf(ConnectionId);
+ transport.Oneway(new ShutdownInfo());
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during connection close: {0}", ex);
+ }
+ transport.Dispose();
+ transport = null;
+ closed = true;
+ }
+ }
+
public void Dispose()
{
- /*
- foreach (Session session in sessions)
- {
- session.Dispose();
- }
- */
- closing = true;
- DisposeOf(ConnectionId);
- sessions.Clear();
- transport.Oneway(new ShutdownInfo());
- transport.Dispose();
- closed = true;
+ // For now we do not distinguish between Dispose() and Close().
+ // In theory Dispose should possibly be lighter-weight and perform a (faster)
+ // disorderly close.
+ Close();
}
// Properties
@@ -178,7 +219,7 @@
{
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
- SyncRequest(command);
+ transport.Oneway(command);
}
@@ -292,6 +333,13 @@
if (ExceptionListener != null)
ExceptionListener(exception);
}
+
+ internal void OnSessionException(Session sender, Exception exception)
+ {
+ Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
+ if (ExceptionListener != null)
+ ExceptionListener(exception);
+ }
protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
{
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs Fri Oct 27 10:24:35 2006
@@ -31,8 +31,23 @@
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
- readonly AutoResetEvent resetEvent = new AutoResetEvent(false);
-
+
+ // TODO can't use EventWaitHandle on MONO 1.0
+ AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
+ bool m_bAsyncDelivery = false;
+ bool m_bClosed = false;
+
+ public void SetAsyncDelivery(AutoResetEvent eventHandle)
+ {
+ lock (semaphore)
+ {
+ messageReceivedEventHandle = eventHandle;
+ m_bAsyncDelivery = true;
+ if (queue.Count > 0)
+ messageReceivedEventHandle.Set();
+ }
+ }
+
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
@@ -42,7 +57,7 @@
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver)
- {
+ {
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
@@ -54,7 +69,7 @@
}
queue = replacement;
if (queue.Count > 0)
- resetEvent.Set();
+ messageReceivedEventHandle.Set();
}
}
@@ -77,7 +92,7 @@
lock (semaphore)
{
queue.Enqueue(message);
- resetEvent.Set();
+ messageReceivedEventHandle.Set();
}
}
@@ -89,13 +104,9 @@
IMessage rc = null;
lock (semaphore)
{
- if (queue.Count > 0)
+ if (!m_bClosed && queue.Count > 0)
{
rc = (IMessage) queue.Dequeue();
- if (queue.Count > 0)
- {
- resetEvent.Set();
- }
}
}
return rc;
@@ -106,14 +117,25 @@
/// </summary>
public IMessage Dequeue(TimeSpan timeout)
{
- IMessage rc = DequeueNoWait();
- while (rc == null)
+ IMessage rc;
+ bool bClosed = false;
+ lock (semaphore)
+ {
+ bClosed = m_bClosed;
+ rc = DequeueNoWait();
+ }
+
+ while (!bClosed && rc == null)
{
- if( !resetEvent.WaitOne((int)timeout.TotalMilliseconds, false) )
+ if( !messageReceivedEventHandle.WaitOne((int)timeout.TotalMilliseconds, false) )
{
break;
}
- rc = DequeueNoWait();
+ lock (semaphore)
+ {
+ rc = DequeueNoWait();
+ bClosed = m_bClosed;
+ }
}
return rc;
}
@@ -123,18 +145,17 @@
/// </summary>
public IMessage Dequeue()
{
- IMessage rc = DequeueNoWait();
- while (rc == null)
- {
- if (!resetEvent.WaitOne(-1, false))
- {
- break;
- }
- rc = DequeueNoWait();
- }
- return rc;
+ return Dequeue(TimeSpan.MaxValue);
}
-
- }
-}
+ internal void Close()
+ {
+ lock (semaphore)
+ {
+ m_bClosed = true;
+ if(m_bAsyncDelivery)
+ messageReceivedEventHandle.Set();
+ }
+ }
+ }
+}
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs?view=auto&rev=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs Fri Oct 27 10:24:35 2006
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+
+namespace ActiveMQ
+{
+ internal class DispatchingThread
+ {
+ public delegate void DispatchFunction();
+ public delegate void ExceptionHandler(Exception exception);
+
+ private readonly AutoResetEvent m_event = new AutoResetEvent(false);
+ private bool m_bStopFlag = false;
+ private Thread m_thread = null;
+ private readonly DispatchFunction m_dispatchFunc;
+ private event ExceptionHandler m_exceptionListener;
+
+ public DispatchingThread(DispatchFunction dispatchFunc)
+ {
+ m_dispatchFunc = dispatchFunc;
+ }
+
+ // TODO can't use EventWaitHandle on MONO 1.0
+ public AutoResetEvent EventHandle
+ {
+ get { return m_event; }
+ }
+
+ internal event ExceptionHandler ExceptionListener
+ {
+ add
+ {
+ m_exceptionListener += value;
+ }
+ remove
+ {
+ m_exceptionListener -= value;
+ }
+ }
+
+ internal void Start()
+ {
+ lock (this)
+ {
+ if (m_thread == null)
+ {
+ m_bStopFlag = false;
+ m_thread = new Thread(new ThreadStart(MyThreadFunc));
+ m_event.Set();
+ Tracer.Info("Starting dispatcher thread for session");
+ m_thread.Start();
+ }
+ }
+ }
+
+ internal void Stop()
+ {
+ Stop(System.Threading.Timeout.Infinite);
+ }
+
+
+ internal void Stop(int timeoutMilliseconds)
+ {
+ Tracer.Info("Stopping dispatcher thread for session");
+ Thread localThread = null;
+ lock (this)
+ {
+ localThread = m_thread;
+ m_thread = null;
+ if (!m_bStopFlag)
+ {
+ m_bStopFlag = true;
+ m_event.Set();
+ }
+ }
+ if(localThread!=null)
+ {
+ localThread.Join(timeoutMilliseconds);
+ }
+ Tracer.Info("Dispatcher thread joined");
+ }
+
+ private void MyThreadFunc()
+ {
+ Tracer.Info("Dispatcher thread started");
+ while (true) // loop forever (well, at least until we've been asked to stop)
+ {
+ lock (this)
+ {
+ if (m_bStopFlag)
+ break;
+ }
+
+ try
+ {
+ m_dispatchFunc();
+ }
+ catch (Exception ex)
+ {
+ if (m_exceptionListener != null)
+ m_exceptionListener(ex);
+ }
+ m_event.WaitOne();
+ }
+ Tracer.Info("Dispatcher thread stopped");
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs Fri Oct 27 10:24:35 2006
@@ -38,7 +38,7 @@
private Session session;
private ConsumerInfo info;
private AcknowledgementMode acknowledgementMode;
- private bool closed;
+ private bool closed = false;
private Dispatcher dispatcher = new Dispatcher();
private int maximumRedeliveryCount = 10;
private int redeliveryTimeout = 500;
@@ -48,21 +48,26 @@
{
add {
listener += value;
- FireAsyncDispatchOfMessages();
+ session.StartAsyncDelivery(dispatcher);
}
remove {
listener -= value;
}
}
-
- public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
+ // Constructor internal to prevent clients from creating an instance.
+ internal MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
{
this.session = session;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
}
+ internal Dispatcher Dispatcher
+ {
+ get { return this.dispatcher; }
+ }
+
public ConsumerId ConsumerId
{
get {
@@ -95,19 +100,8 @@
public void Dispatch(ActiveMQMessage message)
{
dispatcher.Enqueue(message);
-
- if (listener != null)
- {
- FireAsyncDispatchOfMessages();
- }
}
- protected void FireAsyncDispatchOfMessages()
- {
- // lets dispatch to the thread pool for this connection for messages to be processed
- ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
- }
-
public IMessage Receive()
{
CheckClosed();
@@ -126,37 +120,27 @@
return AutoAcknowledge(dispatcher.DequeueNoWait());
}
-
-
public void Dispose()
{
session.DisposeOf(info.ConsumerId);
- closed = true;
+ Close();
}
/// <summary>
/// Dispatch any pending messages to the asynchronous listener
/// </summary>
- public void DispatchAsyncMessages()
+ internal void DispatchAsyncMessages()
{
while (listener != null)
{
IMessage message = dispatcher.DequeueNoWait();
- if (message != null)
- {
- //here we add the code that if do acknowledge action.
- message = AutoAcknowledge(message);
- try
- {
- listener(message);
- } catch(Exception e)
- {
- // TODO: what do do if the listener errors out?
- }
- }
-
- // lets now break to give the acknowledgement a chance to be processed
- break;
+ if (message == null)
+ break;
+
+ //here we add the code that if do acknowledge action.
+ message = AutoAcknowledge(message);
+ // invoke listener. Exceptions caught by the dispatcher thread
+ listener(message);
}
}
@@ -239,16 +223,27 @@
else
{
dispatcher.Redeliver(message);
-
- if (listener != null)
- {
- // lets re-dispatch the message at some point in the future
- Thread.Sleep(RedeliveryTimeout);
- ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
- }
}
}
- }
+
+ public void Close()
+ {
+ lock(this)
+ {
+ if(closed)
+ return;
+ }
+
+ // wake up any pending dequeue() call on the dispatcher
+ dispatcher.Close();
+
+ lock (this)
+ {
+ closed = true;
+ }
+ }
+ }
+
// TODO maybe there's a cleaner way of creating stateful delegates to make this code neater
class MessageConsumerSynchronization : ISynchronization
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Fri Oct 27 10:24:35 2006
@@ -38,13 +38,21 @@
private bool retroactive;
private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
private TransactionContext transactionContext;
-
+ private DispatchingThread dispatchingThread;
+
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
transactionContext = new TransactionContext(this);
+ dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+ dispatchingThread.ExceptionListener += new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+ }
+
+ void dispatchingThread_ExceptionListener(Exception exception)
+ {
+ connection.OnSessionException(this, exception);
}
@@ -317,6 +325,12 @@
connection.SyncRequest(command);
}
+ public void Close()
+ {
+ // To do: what about session id?
+ StopAsyncDelivery();
+ }
+
/// <summary>
/// Ensures that a transaction is started
/// </summary>
@@ -335,18 +349,17 @@
connection.DisposeOf(objectId);
}
- public void DispatchAsyncMessages(object state)
+ /// <summary>
+ /// Private method called by the dispatcher thread in order to perform
+ /// asynchronous delivery of queued (inbound) messages.
+ /// </summary>
+ private void DispatchAsyncMessages()
{
// lets iterate through each consumer created by this session
// ensuring that they have all pending messages dispatched
- lock (this)
+ foreach (MessageConsumer consumer in GetConsumers())
{
- // lets ensure that only 1 thread dispatches messages in a consumer at once
-
- foreach (MessageConsumer consumer in GetConsumers())
- {
- consumer.DispatchAsyncMessages();
- }
+ consumer.DispatchAsyncMessages();
}
}
@@ -425,5 +438,17 @@
protected void Configure(ActiveMQMessage message)
{
}
+
+ internal void StopAsyncDelivery()
+ {
+ dispatchingThread.Stop();
+ }
+
+ internal void StartAsyncDelivery(Dispatcher dispatcher)
+ {
+ if(dispatcher != null)
+ dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
+ dispatchingThread.Start();
+ }
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Fri Oct 27 10:24:35 2006
@@ -23,6 +23,17 @@
{
bool value;
+ public bool Value
+ {
+ get
+ {
+ lock (this)
+ {
+ return value;
+ }
+ }
+ }
+
public AtomicBoolean(bool b)
{
value = b;
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs Fri Oct 27 10:24:35 2006
@@ -93,5 +93,11 @@
/// An asynchronous listener which can be notified if an error occurs
/// </summary>
event ExceptionListener ExceptionListener;
+
+ /// <summary>
+ /// Closes the connection.
+ /// </summary>
+ void Close();
+
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs Fri Oct 27 10:24:35 2006
@@ -46,6 +46,16 @@
/// An asynchronous listener which can be used to consume messages asynchronously
/// </summary>
event MessageListener Listener;
+
+ /// <summary>
+ /// Closes the message consumer.
+ /// </summary>
+ /// <remarks>
+ /// Clients should close message consumers them when they are not needed.
+ /// This call blocks until a receive or message listener in progress has completed.
+ /// A blocked message consumer receive call returns null when this message consumer is closed.
+ /// </remarks>
+ void Close();
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs Fri Oct 27 10:24:35 2006
@@ -101,6 +101,11 @@
/// </summary>
IBytesMessage CreateBytesMessage(byte[] body);
+ /// <summary>
+ /// Closes the session. There is no need to close the producers and consumers
+ /// of a closed session.
+ /// </summary>
+ void Close();
// Transaction methods
@@ -115,6 +120,5 @@
/// send and acknowledgements for producers and consumers in this session
/// </summary>
void Rollback();
-
}
}