You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/12/12 19:25:52 UTC
svn commit: r726083 [1/2] - in
/activemq/activemq-dotnet/Apache.NMS.WCF/trunk: ./ src/main/csharp/
src/main/csharp/Configuration/
Author: jgomes
Date: Fri Dec 12 10:25:52 2008
New Revision: 726083
URL: http://svn.apache.org/viewvc?rev=726083&view=rev
Log:
Applying patch file from David that adds support for Sessions in the WCF component. Thanks, David!
Fixes [AMQNET-127]. (See https://issues.apache.org/activemq/browse/AMQNET-127)
Added:
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
Removed:
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs Fri Dec 12 10:25:52 2008
@@ -26,12 +26,20 @@
namespace Apache.NMS.WCF
{
/// <summary>
- /// key class to specify the custom transport class and its schema.
- /// Its key role in the WCF is to be the 'factory of factories'. It determines what shape
- /// the channel will be. In this case by returning channel factory (for the client) that returns a IOutputChannel
- /// and a channel listener (for the server) that returns a IInputChannel, this class determines
- /// that this implementation is a datagram 'shape'.
+ /// Key class to specify the custom transport class and its schema.
/// </summary>
+ /// <remarks>
+ /// <para>
+ /// Its key role in the WCF is to be the 'factory of factories'. It determines what shape
+ /// the channel will be. In this case by returning channel factory (for the client) that returns an
+ /// <see cref="IOutputChannel" /> or <see cref="IOutputSessionChannel" />, and a channel listener
+ /// (for the server) that returns an <see cref="IInputChannel" /> or <see cref="IInputSessionChannel" />,
+ /// this class determines that this implementation is a datagram 'shape'.
+ /// </para>
+ /// <para>
+ /// The request/reply channel shape is not supported by WCF.
+ /// </para>
+ /// </remarks>
public class NmsTransportBindingElement : TransportBindingElement, IWsdlExportExtension, IPolicyExportExtension
{
#region Constructors
@@ -85,31 +93,31 @@
/// <summary>
/// Determines whether this instance can build a channel factory in the specified context.
- /// In this case an implementation of IOutputChannel.
+ /// Only implementations of <see cref="IOutputChannel" /> and <see cref="IOutputSessionChannel" /> are supported.
/// </summary>
/// <typeparam name="TChannel">The type of the channel.</typeparam>
/// <param name="context">The context.</param>
/// <returns>
- /// <c>true</c> if this instance [can build channel factory] the specified context; otherwise, <c>false</c>.
+ /// <c>true</c> if the requested channel factory can be built; otherwise, <c>false</c>.
/// </returns>
public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
- return (typeof(TChannel) == typeof(IOutputChannel));
+ return (typeof(TChannel) == typeof(IOutputChannel) || typeof(TChannel) == typeof(IOutputSessionChannel));
}
/// <summary>
/// Determines whether this instance can build a channel listener in the specified context.
- /// In this case in implementation that will return an IInputChannel.
+ /// Only implementations of <see cref="IInputChannel" /> and <see cref="IInputSessionChannel" /> are supported.
/// </summary>
/// <typeparam name="TChannel">The type of the channel.</typeparam>
/// <param name="context">The context.</param>
/// <returns>
- /// <c>true</c> if this instance [can build channel listener] the specified context; otherwise, <c>false</c>.
+ /// <c>true</c> if the requested channel listener can be built; otherwise, <c>false</c>.
/// </returns>
/// <exception cref="ArgumentException">the requested channel does not implement <see cref="IReplyChannel" />.</exception>
public override bool CanBuildChannelListener<TChannel>(BindingContext context)
{
- return (typeof(TChannel) == typeof(IInputChannel));
+ return (typeof(TChannel) == typeof(IInputChannel) || typeof(TChannel) == typeof(IInputSessionChannel));
}
/// <summary>
@@ -129,7 +137,7 @@
{
throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
}
- return (IChannelFactory<TChannel>) new NmsChannelFactory(this, context);
+ return (IChannelFactory<TChannel>) new NmsChannelFactory<TChannel>(this, context);
}
/// <summary>
@@ -144,11 +152,17 @@
{
throw new ArgumentNullException("context");
}
+
if(!CanBuildChannelListener<TChannel>(context))
{
throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
}
- return (IChannelListener<TChannel>) new NmsChannelListener(this, context);
+
+ if (typeof(TChannel) == typeof(IInputSessionChannel))
+ {
+ return (IChannelListener<TChannel>)new NmsInputSessionChannelListener(this, context);
+ }
+ return (IChannelListener<TChannel>) new NmsInputChannelListener(this, context);
}
/// <summary>
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs Fri Dec 12 10:25:52 2008
@@ -24,24 +24,24 @@
namespace Apache.NMS.WCF
{
// ItemDequeuedCallback is called as an item is dequeued from the InputQueue. The
- // InputQueue lock is not held during the callback. However, the user code is
- // not notified of the item being available until the callback returns. If you
- // are not sure if the callback blocks for a long time, then first call
+ // InputQueue lock is not held during the callback. However, the user code will
+ // not be notified of the item being available until the callback returns. If you
+ // are not sure if the callback will block for a long time, then first call
// IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
- internal delegate void ItemDequeuedCallback();
+ delegate void ItemDequeuedCallback();
/// <summary>
/// Handles asynchronous interactions between producers and consumers.
/// Producers can dispatch available data to the input queue,
- /// where it is dispatched to a waiting consumer or stored until a
+ /// where it will be dispatched to a waiting consumer or stored until a
/// consumer becomes available. Consumers can synchronously or asynchronously
- /// request data from the queue, which is returned when data becomes
+ /// request data from the queue, which will be returned when data becomes
/// available.
/// </summary>
/// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
- internal class InputQueue<T> : IDisposable where T : class
+ class InputQueue<T> : IDisposable where T : class
{
- //Stores items that are waiting to be accessed.
+ //Stores items that are waiting to be consumed.
ItemQueue itemQueue;
//Each IQueueReader represents some consumer that is waiting for
@@ -51,7 +51,7 @@
//Each IQueueWaiter represents some waiter that is waiting for
//items to appear in the queue. When any item appears, all
- //waiters are signaled.
+ //waiters are signalled.
List<IQueueWaiter> waiterList;
static WaitCallback onInvokeDequeuedCallback;
@@ -60,9 +60,9 @@
static WaitCallback completeWaitersFalseCallback;
static WaitCallback completeWaitersTrueCallback;
- //Represents the current state of the InputQueue.
+ //Represents the current state of the InputQueue
//as it transitions through its lifecycle.
- QueueState _queueState;
+ QueueState queueState;
enum QueueState
{
Open,
@@ -70,15 +70,12 @@
Closed
}
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>"/> class.
- /// </summary>
public InputQueue()
{
- itemQueue = new ItemQueue();
- readerQueue = new Queue<IQueueReader>();
- waiterList = new List<IQueueWaiter>();
- _queueState = QueueState.Open;
+ this.itemQueue = new ItemQueue();
+ this.readerQueue = new Queue<IQueueReader>();
+ this.waiterList = new List<IQueueWaiter>();
+ this.queueState = QueueState.Open;
}
public int PendingCount
@@ -92,6 +89,33 @@
}
}
+ // added by Roman
+ public int NumberOfReaders
+ {
+ get
+ {
+ lock(ThisLock)
+ {
+ return readerQueue.Count;
+ }
+ }
+ }
+ public void Open()
+ {
+ lock(ThisLock)
+ {
+ if(queueState == QueueState.Open)
+ {
+ return;
+ }
+
+ if(queueState == QueueState.Closed)
+ {
+ throw new ObjectDisposedException(this.GetType().ToString());
+ }
+ }
+ }
+
object ThisLock
{
get { return itemQueue; }
@@ -103,7 +127,7 @@
lock(ThisLock)
{
- if(_queueState == QueueState.Open)
+ if(queueState == QueueState.Open)
{
if(itemQueue.HasAvailableItem)
{
@@ -116,7 +140,7 @@
return reader;
}
}
- else if(_queueState == QueueState.Shutdown)
+ else if(queueState == QueueState.Shutdown)
{
if(itemQueue.HasAvailableItem)
{
@@ -139,7 +163,7 @@
{
lock(ThisLock)
{
- if(_queueState == QueueState.Open)
+ if(queueState == QueueState.Open)
{
if(!itemQueue.HasAvailableItem)
{
@@ -148,7 +172,7 @@
return waiter;
}
}
- else if(_queueState == QueueState.Shutdown)
+ else if(queueState == QueueState.Shutdown)
{
if(!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
{
@@ -196,7 +220,7 @@
{
if(completeWaitersTrueCallback == null)
{
- completeWaitersTrueCallback = CompleteWaitersTrueCallback;
+ completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
}
ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
@@ -205,7 +229,7 @@
{
if(completeWaitersFalseCallback == null)
{
- completeWaitersFalseCallback = CompleteWaitersFalseCallback;
+ completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
}
ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
@@ -235,19 +259,19 @@
IQueueReader[] outstandingReaders = null;
lock(ThisLock)
{
- if(_queueState == QueueState.Shutdown)
+ if(queueState == QueueState.Shutdown)
{
return;
}
- if(_queueState == QueueState.Closed)
+ if(queueState == QueueState.Closed)
{
return;
}
- _queueState = QueueState.Shutdown;
+ this.queueState = QueueState.Shutdown;
- if(readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+ if(readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
{
outstandingReaders = new IQueueReader[readerQueue.Count];
readerQueue.CopyTo(outstandingReaders, 0);
@@ -268,7 +292,7 @@
{
T value;
- if(!Dequeue(timeout, out value))
+ if(!this.Dequeue(timeout, out value))
{
throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
}
@@ -283,7 +307,7 @@
lock(ThisLock)
{
- if(_queueState == QueueState.Open)
+ if(queueState == QueueState.Open)
{
if(itemQueue.HasAvailableItem)
{
@@ -295,7 +319,7 @@
readerQueue.Enqueue(reader);
}
}
- else if(_queueState == QueueState.Shutdown)
+ else if(queueState == QueueState.Shutdown)
{
if(itemQueue.HasAvailableItem)
{
@@ -323,15 +347,18 @@
{
return reader.Wait(timeout, out value);
}
-
- InvokeDequeuedCallback(item.DequeuedCallback);
- value = item.GetValue();
- return true;
+ else
+ {
+ InvokeDequeuedCallback(item.DequeuedCallback);
+ value = item.GetValue();
+ return true;
+ }
}
public void Dispose()
{
Dispose(true);
+
GC.SuppressFinalize(this);
}
@@ -343,9 +370,9 @@
lock(ThisLock)
{
- if(_queueState != QueueState.Closed)
+ if(queueState != QueueState.Closed)
{
- _queueState = QueueState.Closed;
+ queueState = QueueState.Closed;
dispose = true;
}
}
@@ -378,10 +405,10 @@
lock(ThisLock)
{
- itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
- GetWaiters(out waiters);
+ itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
+ this.GetWaiters(out waiters);
- if(_queueState != QueueState.Closed)
+ if(queueState != QueueState.Closed)
{
itemQueue.MakePendingItemAvailable();
@@ -390,7 +417,7 @@
item = itemQueue.DequeueAvailableItem();
reader = readerQueue.Dequeue();
- if(_queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+ if(queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
{
outstandingReaders = new IQueueReader[readerQueue.Count];
readerQueue.CopyTo(outstandingReaders, 0);
@@ -405,7 +432,9 @@
if(outstandingReaders != null)
{
if(completeOutstandingReadersCallback == null)
- completeOutstandingReadersCallback = CompleteOutstandingReadersCallback;
+ {
+ completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
+ }
ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
}
@@ -427,7 +456,7 @@
{
T value;
- if(!EndDequeue(result, out value))
+ if(!this.EndDequeue(result, out value))
{
throw new TimeoutException("Asynchronous Dequeue operation timed out.");
}
@@ -467,6 +496,7 @@
public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
{
EnqueueAndDispatch(item, dequeuedCallback, true);
+ //EnqueueAndDispatch(item, dequeuedCallback, false);
}
public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
@@ -491,10 +521,10 @@
lock(ThisLock)
{
- itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
- GetWaiters(out waiters);
+ itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
+ this.GetWaiters(out waiters);
- if(_queueState == QueueState.Open)
+ if(queueState == QueueState.Open)
{
if(canDispatchOnThisThread)
{
@@ -548,7 +578,7 @@
{
if(onDispatchCallback == null)
{
- onDispatchCallback = OnDispatchCallback;
+ onDispatchCallback = new WaitCallback(OnDispatchCallback);
}
ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
@@ -572,22 +602,25 @@
return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
}
- // This does not block, however, Dispatch() must be called later if this function
+ // This will not block, however, Dispatch() must be called later if this function
// returns true.
bool EnqueueWithoutDispatch(Item item)
{
lock(ThisLock)
{
// Open
- if(_queueState != QueueState.Closed && _queueState != QueueState.Shutdown)
+ if(queueState != QueueState.Closed && queueState != QueueState.Shutdown)
{
if(readerQueue.Count == 0)
{
itemQueue.EnqueueAvailableItem(item);
return false;
}
- itemQueue.EnqueuePendingItem(item);
- return true;
+ else
+ {
+ itemQueue.EnqueuePendingItem(item);
+ return true;
+ }
}
}
@@ -632,14 +665,14 @@
{
lock(ThisLock)
{
- if(_queueState == QueueState.Open || _queueState == QueueState.Shutdown)
+ if(queueState == QueueState.Open || queueState == QueueState.Shutdown)
{
bool removed = false;
for(int i = readerQueue.Count; i > 0; i--)
{
IQueueReader temp = readerQueue.Dequeue();
- if(ReferenceEquals(temp, reader))
+ if(Object.ReferenceEquals(temp, reader))
{
removed = true;
}
@@ -663,7 +696,7 @@
lock(ThisLock)
{
- if(_queueState == QueueState.Open)
+ if(queueState == QueueState.Open)
{
if(itemQueue.HasAvailableItem)
{
@@ -675,7 +708,7 @@
waiterList.Add(waiter);
}
}
- else if(_queueState == QueueState.Shutdown)
+ else if(queueState == QueueState.Shutdown)
{
if(itemQueue.HasAvailableItem)
{
@@ -697,7 +730,14 @@
}
}
- return waiter != null ? waiter.Wait(timeout) : itemAvailable;
+ if(waiter != null)
+ {
+ return waiter.Wait(timeout);
+ }
+ else
+ {
+ return itemAvailable;
+ }
}
interface IQueueReader
@@ -712,23 +752,23 @@
class WaitQueueReader : IQueueReader
{
- Exception _exception;
- InputQueue<T> _inputQueue;
- T _item;
- ManualResetEvent _waitEvent;
- object _thisLock = new object();
+ Exception exception;
+ InputQueue<T> inputQueue;
+ T item;
+ ManualResetEvent waitEvent;
+ object thisLock = new object();
public WaitQueueReader(InputQueue<T> inputQueue)
{
- _inputQueue = inputQueue;
- _waitEvent = new ManualResetEvent(false);
+ this.inputQueue = inputQueue;
+ waitEvent = new ManualResetEvent(false);
}
object ThisLock
{
get
{
- return _thisLock;
+ return this.thisLock;
}
}
@@ -736,12 +776,12 @@
{
lock(ThisLock)
{
- Debug.Assert(_item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
- Debug.Assert(_exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
+ Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
+ Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
- _exception = item.Exception;
- _item = item.Value;
- _waitEvent.Set();
+ this.exception = item.Exception;
+ this.item = item.Value;
+ waitEvent.Set();
}
}
@@ -752,11 +792,11 @@
{
if(timeout == TimeSpan.MaxValue)
{
- _waitEvent.WaitOne();
+ waitEvent.WaitOne();
}
- else if(!_waitEvent.WaitOne(timeout, false))
+ else if(!waitEvent.WaitOne(timeout, false))
{
- if(_inputQueue.RemoveReader(this))
+ if(this.inputQueue.RemoveReader(this))
{
value = default(T);
isSafeToClose = true;
@@ -764,7 +804,7 @@
}
else
{
- _waitEvent.WaitOne();
+ waitEvent.WaitOne();
}
}
@@ -774,31 +814,31 @@
{
if(isSafeToClose)
{
- _waitEvent.Close();
+ waitEvent.Close();
}
}
- value = _item;
+ value = item;
return true;
}
}
- class AsyncQueueReader : AsyncResult, IQueueReader
+ public class AsyncQueueReader : AsyncResult, IQueueReader
{
static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
- bool _expired;
- InputQueue<T> _inputQueue;
- T _item;
- Timer _timer;
+ bool expired;
+ InputQueue<T> inputQueue;
+ T item;
+ Timer timer;
public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
: base(callback, state)
{
- _inputQueue = inputQueue;
+ this.inputQueue = inputQueue;
if(timeout != TimeSpan.MaxValue)
{
- _timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+ this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
}
}
@@ -806,262 +846,190 @@
{
AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
- if(readerResult._expired)
+ if(readerResult.expired)
{
value = default(T);
return false;
}
-
- value = readerResult._item;
- return true;
+ else
+ {
+ value = readerResult.item;
+ return true;
+ }
}
static void TimerCallback(object state)
{
AsyncQueueReader thisPtr = (AsyncQueueReader) state;
- if(thisPtr._inputQueue.RemoveReader(thisPtr))
+ if(thisPtr.inputQueue.RemoveReader(thisPtr))
{
- thisPtr._expired = true;
+ thisPtr.expired = true;
thisPtr.Complete(false);
}
}
public void Set(Item item)
{
- _item = item.Value;
- if(_timer != null)
+ this.item = item.Value;
+ if(this.timer != null)
{
- _timer.Change(-1, -1);
+ this.timer.Change(-1, -1);
}
Complete(false, item.Exception);
}
}
- internal struct Item
+ public struct Item
{
- private T _value;
- private Exception _exception;
- ItemDequeuedCallback _dequeuedCallback;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.Item"/> class.
- /// </summary>
- /// <param name="value">The value.</param>
- /// <param name="dequeuedCallback">The dequeued callback.</param>
+ T value;
+ Exception exception;
+ ItemDequeuedCallback dequeuedCallback;
+
public Item(T value, ItemDequeuedCallback dequeuedCallback)
: this(value, null, dequeuedCallback)
{
}
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.Item"/> class.
- /// </summary>
- /// <param name="exception">The exception.</param>
- /// <param name="dequeuedCallback">The dequeued callback.</param>
public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
: this(null, exception, dequeuedCallback)
{
}
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.Item"/> class.
- /// </summary>
- /// <param name="value">The value.</param>
- /// <param name="exception">The exception.</param>
- /// <param name="dequeuedCallback">The dequeued callback.</param>
- internal Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
- {
- _value = value;
- _exception = exception;
- _dequeuedCallback = dequeuedCallback;
- }
-
- /// <summary>
- /// Gets the exception.
- /// </summary>
- /// <value>The exception.</value>
+ Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
+ {
+ this.value = value;
+ this.exception = exception;
+ this.dequeuedCallback = dequeuedCallback;
+ }
+
public Exception Exception
{
- get { return _exception; }
+ get { return this.exception; }
}
- /// <summary>
- /// Gets the value.
- /// </summary>
- /// <value>The value.</value>
public T Value
{
- get { return _value; }
+ get { return value; }
}
- /// <summary>
- /// Gets the dequeued callback.
- /// </summary>
- /// <value>The dequeued callback.</value>
public ItemDequeuedCallback DequeuedCallback
{
- get { return _dequeuedCallback; }
+ get { return dequeuedCallback; }
}
- /// <summary>
- /// Releases unmanaged and - optionally - managed resources
- /// </summary>
public void Dispose()
{
- if(_value != null)
+ if(value != null)
{
- if(_value is IDisposable)
+ if(value is IDisposable)
{
- ((IDisposable) _value).Dispose();
+ ((IDisposable) value).Dispose();
}
- else if(_value is ICommunicationObject)
+ else if(value is ICommunicationObject)
{
- ((ICommunicationObject) _value).Abort();
+ ((ICommunicationObject) value).Abort();
}
}
}
- /// <summary>
- /// Gets the value.
- /// </summary>
- /// <returns></returns>
public T GetValue()
{
- if(_exception != null)
+ if(this.exception != null)
{
- throw _exception;
+ throw this.exception;
}
- return _value;
+ return this.value;
}
}
- internal class WaitQueueWaiter : IQueueWaiter
+ class WaitQueueWaiter : IQueueWaiter
{
- bool _itemAvailable;
- ManualResetEvent _waitEvent;
- object _thisLock = new object();
+ bool itemAvailable;
+ ManualResetEvent waitEvent;
+ object thisLock = new object();
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.WaitQueueWaiter"/> class.
- /// </summary>
public WaitQueueWaiter()
{
- _waitEvent = new ManualResetEvent(false);
+ waitEvent = new ManualResetEvent(false);
}
- /// <summary>
- /// Gets the this lock.
- /// </summary>
- /// <value>The this lock.</value>
object ThisLock
{
get
{
- return _thisLock;
+ return this.thisLock;
}
}
- /// <summary>
- /// Sets the specified item available.
- /// </summary>
- /// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
public void Set(bool itemAvailable)
{
lock(ThisLock)
{
- _itemAvailable = itemAvailable;
- _waitEvent.Set();
+ this.itemAvailable = itemAvailable;
+ waitEvent.Set();
}
}
- /// <summary>
- /// Waits the specified timeout.
- /// </summary>
- /// <param name="timeout">The timeout.</param>
- /// <returns></returns>
public bool Wait(TimeSpan timeout)
{
if(timeout == TimeSpan.MaxValue)
{
- _waitEvent.WaitOne();
+ waitEvent.WaitOne();
}
- else if(!_waitEvent.WaitOne(timeout, false))
+ else if(!waitEvent.WaitOne(timeout, false))
{
return false;
}
- return _itemAvailable;
+ return this.itemAvailable;
}
}
- internal class AsyncQueueWaiter : AsyncResult, IQueueWaiter
+ class AsyncQueueWaiter : AsyncResult, IQueueWaiter
{
static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
- Timer _timer;
- bool _itemAvailable;
- object _thisLock = new object();
-
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.AsyncQueueWaiter"/> class.
- /// </summary>
- /// <param name="timeout">The timeout.</param>
- /// <param name="callback">The callback.</param>
- /// <param name="state">The state.</param>
+ Timer timer;
+ bool itemAvailable;
+ object thisLock = new object();
+
public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state)
: base(callback, state)
{
if(timeout != TimeSpan.MaxValue)
{
- _timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+ this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
}
}
- /// <summary>
- /// Gets the this lock.
- /// </summary>
- /// <value>The this lock.</value>
object ThisLock
{
get
{
- return _thisLock;
+ return this.thisLock;
}
}
- /// <summary>
- /// Ends the specified result.
- /// </summary>
- /// <param name="result">The result.</param>
- /// <returns></returns>
public static bool End(IAsyncResult result)
{
AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
- return waiterResult._itemAvailable;
+ return waiterResult.itemAvailable;
}
- /// <summary>
- /// Callback that is invoked when the timer completes.
- /// </summary>
- /// <param name="state">The state.</param>
- public static void TimerCallback(object state)
+ static void TimerCallback(object state)
{
AsyncQueueWaiter thisPtr = (AsyncQueueWaiter) state;
thisPtr.Complete(false);
}
- /// <summary>
- /// Sets the specified item available.
- /// </summary>
- /// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
public void Set(bool itemAvailable)
{
bool timely;
lock(ThisLock)
{
- timely = (_timer == null) || _timer.Change(-1, -1);
- _itemAvailable = itemAvailable;
+ timely = (this.timer == null) || this.timer.Change(-1, -1);
+ this.itemAvailable = itemAvailable;
}
if(timely)
@@ -1071,142 +1039,104 @@
}
}
- internal class ItemQueue
+ class ItemQueue
{
- Item[] _items;
- int _head;
- int _pendingCount;
- int _totalCount;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="InputQueue<T>.ItemQueue"/> class.
- /// </summary>
+ Item[] items;
+ int head;
+ int pendingCount;
+ int totalCount;
+
public ItemQueue()
{
- _items = new Item[1];
+ items = new Item[1];
}
- /// <summary>
- /// Dequeues the available item.
- /// </summary>
- /// <returns></returns>
public Item DequeueAvailableItem()
{
- if(_totalCount == _pendingCount)
+ if(totalCount == pendingCount)
{
- throw new Exception("Internal Error - ItemQueue does not contain any available items");
+ Debug.Assert(false, "ItemQueue does not contain any available items");
+ throw new Exception("Internal Error");
}
return DequeueItemCore();
}
- /// <summary>
- /// Dequeues any item.
- /// </summary>
- /// <returns></returns>
public Item DequeueAnyItem()
{
- if(_pendingCount == _totalCount)
+ if(pendingCount == totalCount)
{
- _pendingCount--;
+ pendingCount--;
}
return DequeueItemCore();
}
- /// <summary>
- /// Enqueues the item core.
- /// </summary>
- /// <param name="item">The item.</param>
void EnqueueItemCore(Item item)
{
- if(_totalCount == _items.Length)
+ if(totalCount == items.Length)
{
- Item[] newItems = new Item[_items.Length * 2];
- for(int i = 0; i < _totalCount; i++)
+ Item[] newItems = new Item[items.Length * 2];
+ for(int i = 0; i < totalCount; i++)
{
- newItems[i] = _items[(_head + i) % _items.Length];
+ newItems[i] = items[(head + i) % items.Length];
}
- _head = 0;
- _items = newItems;
+
+ head = 0;
+ items = newItems;
}
- int tail = (_head + _totalCount) % _items.Length;
- _items[tail] = item;
- _totalCount++;
+ int tail = (head + totalCount) % items.Length;
+ items[tail] = item;
+ totalCount++;
}
- /// <summary>
- /// Dequeues the item core.
- /// </summary>
- /// <returns></returns>
Item DequeueItemCore()
{
- if(_totalCount == 0)
+ if(totalCount == 0)
{
- throw new Exception("Internal Error - ItemQueue does not contain any items");
+ Debug.Assert(false, "ItemQueue does not contain any items");
+ throw new Exception("Internal Error");
}
- Item item = _items[_head];
- _items[_head] = new Item();
- _totalCount--;
- _head = (_head + 1) % _items.Length;
+ Item item = items[head];
+ items[head] = new Item();
+ totalCount--;
+ head = (head + 1) % items.Length;
return item;
}
- /// <summary>
- /// Enqueues the pending item.
- /// </summary>
- /// <param name="item">The item.</param>
public void EnqueuePendingItem(Item item)
{
EnqueueItemCore(item);
- _pendingCount++;
+ pendingCount++;
}
- /// <summary>
- /// Enqueues the available item.
- /// </summary>
- /// <param name="item">The item.</param>
public void EnqueueAvailableItem(Item item)
{
EnqueueItemCore(item);
}
- /// <summary>
- /// Makes the pending item available.
- /// </summary>
public void MakePendingItemAvailable()
{
- if(_pendingCount == 0)
+ if(pendingCount == 0)
{
- throw new Exception("Internal Error - ItemQueue does not contain any pending items");
+ Debug.Assert(false, "ItemQueue does not contain any pending items");
+ throw new Exception("Internal Error");
}
- _pendingCount--;
+ pendingCount--;
}
- /// <summary>
- /// Gets a value indicating whether this instance has available items.
- /// </summary>
- /// <value>
- /// <see langword="true"/> if this instance has available item; otherwise, <see langword="false"/>.
- /// </value>
public bool HasAvailableItem
{
- get { return _totalCount > _pendingCount; }
+ get { return totalCount > pendingCount; }
}
- /// <summary>
- /// Gets a value indicating whether this instance has any item.
- /// </summary>
- /// <value>
- /// <see langword="true"/> if this instance has any item; otherwise, <see langword="false"/>.
- /// </value>
public bool HasAnyItem
{
- get { return _totalCount > 0; }
+ get { return totalCount > 0; }
}
public int ItemCount
{
- get { return _totalCount; }
+ get { return totalCount; }
}
}
}
-}
\ No newline at end of file
+}
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs Fri Dec 12 10:25:52 2008
@@ -25,18 +25,20 @@
/// <summary>
/// Factory for message channels.
/// </summary>
- public class NmsChannelFactory : ChannelFactoryBase<IOutputChannel>
+ public class NmsChannelFactory<TChannel> : ChannelFactoryBase<TChannel>
{
#region Constructors
/// <summary>
- /// Initializes a new instance of the <see cref="NmsChannelFactory"/> class.
+ /// Initializes a new instance of the <see cref="NmsChannelFactory<TChannel>"/> class.
/// </summary>
+ /// <param name="bindingElement">The binding element.</param>
/// <param name="context">The context.</param>
- /// <param name="transportElement">The binding element.</param>
- internal NmsChannelFactory(NmsTransportBindingElement transportElement, BindingContext context)
+ internal NmsChannelFactory(NmsTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
{
+ _bindingElement = bindingElement;
+
Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
if(messageEncoderBindingElements.Count > 1)
{
@@ -46,9 +48,9 @@
? NmsConstants.DefaultMessageEncoderFactory
: messageEncoderBindingElements[0].CreateMessageEncoderFactory();
- _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, Int32.MaxValue);
- _destination = transportElement.Destination;
- _destinationType = transportElement.DestinationType;
+ _bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, Int32.MaxValue);
+ _destination = bindingElement.Destination;
+ _destinationType = bindingElement.DestinationType;
Tracer.DebugFormat("Destination ({0}) : {1}", _destinationType, _destination);
}
@@ -100,9 +102,24 @@
/// </returns>
/// <param name="address">The <see cref="T:System.ServiceModel.EndpointAddress" /> of the remote endpoint to which the channel sends messages.</param>
/// <param name="via">The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel.</param>
- protected override IOutputChannel OnCreateChannel(EndpointAddress address, Uri via)
+ protected override TChannel OnCreateChannel(EndpointAddress address, Uri via)
{
- return new NmsOutputChannel(BufferManager, MessageEncoderFactory, address, this, via);
+ if(!String.Equals(address.Uri.Scheme, _bindingElement.Scheme, StringComparison.InvariantCultureIgnoreCase))
+ {
+ throw new ArgumentException(String.Format("The scheme {0} specified in address is not supported.", address.Uri.Scheme), "remoteAddress");
+ }
+
+ if(typeof(TChannel) == typeof(IOutputChannel))
+ {
+ return (TChannel) (object) new NmsOutputChannel(this, address, via, BufferManager, MessageEncoderFactory, Destination, DestinationType);
+ }
+
+ if(typeof(TChannel) == typeof(IOutputSessionChannel))
+ {
+ return (TChannel) (object) new NmsOutputSessionChannel(this, via, address, BufferManager, MessageEncoderFactory, Destination, DestinationType);
+ }
+
+ throw new NotSupportedException(String.Format("The requested channel type {0} is not supported", typeof(TChannel)));
}
#endregion
@@ -156,7 +173,8 @@
private readonly MessageEncoderFactory _encoderFactory;
private readonly string _destination;
private readonly DestinationType _destinationType;
+ private readonly NmsTransportBindingElement _bindingElement;
#endregion
}
-}
\ No newline at end of file
+}
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs Fri Dec 12 10:25:52 2008
@@ -38,33 +38,11 @@
}
/// <summary>
- /// Gets the name of the queue from the URI.
+ /// Creates a unique session identifier.
/// </summary>
- /// <param name="uri">The URI of the message queue.</param>
- public static string GetQueueName(Uri uri)
+ public static string CreateUniqueSessionId()
{
- return uri.LocalPath.TrimStart('/');
- }
-
- /// <summary>
- /// Gets the destination.
- /// </summary>
- /// <param name="session">The session.</param>
- /// <param name="destination">The destination.</param>
- /// <param name="destinationType">Type of the destination.</param>
- public static IDestination GetDestination(NMS.ISession session, string destination, DestinationType destinationType)
- {
- switch(destinationType)
- {
- case DestinationType.Topic:
- return session.GetTopic(destination);
- case DestinationType.TemporaryQueue:
- return session.CreateTemporaryQueue();
- case DestinationType.TemporaryTopic:
- return session.CreateTemporaryTopic();
- default:
- return session.GetQueue(destination);
- }
+ return "uuid:/session-gram/" + Guid.NewGuid();
}
}
-}
\ No newline at end of file
+}
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs Fri Dec 12 10:25:52 2008
@@ -18,323 +18,192 @@
using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
-using System.Text;
namespace Apache.NMS.WCF
{
/// <summary>
- /// Channel for receiving messages.
+ /// Server-side implementation of the sessionless one-way channel.
/// </summary>
- public class NmsInputChannel : NmsChannelBase, IInputChannel
+ public class NmsInputChannel : NmsInputQueueChannelBase<Message>, IInputChannel
{
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref="NmsInputChannel"/> class.
/// </summary>
- /// <param name="bufferManager">The buffer manager.</param>
- /// <param name="encoderFactory">The encoder factory.</param>
- /// <param name="address">The address.</param>
- /// <param name="parent">The parent.</param>
- /// <exception cref="T:System.ArgumentNullException">
- /// <paramref name="channelManager"/> is null.</exception>
- public NmsInputChannel(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, NmsChannelListener parent)
- : base(bufferManager, encoderFactory, address, parent, parent.Destination, parent.DestinationType)
+ /// <param name="factory">The factory that was used to create the channel.</param>
+ /// <param name="localAddress">The local address of the channel.</param>
+ internal NmsInputChannel(ChannelListenerBase factory, EndpointAddress localAddress)
+ : base(factory, localAddress)
{
- _localAddress = address;
- _messages = new InputQueue<Message>();
}
#endregion
- //Hands the message off to other components higher up the
- //channel stack that have previously called BeginReceive()
- //and are waiting for messages to arrive on this channel.
- internal void Dispatch(Message message)
- {
- _messages.EnqueueAndDispatch(message);
- }
+ #region Receive
/// <summary>
- /// Gets the property.
- /// </summary>
- /// <typeparam name="T">The type of the property to attempt to retrieve.</typeparam>
- public override T GetProperty<T>()
- {
- if(typeof(T) == typeof(IInputChannel))
- {
- return (T) (object) this;
- }
-
- T messageEncoderProperty = Encoder.GetProperty<T>();
- if(messageEncoderProperty != null)
- {
- return messageEncoderProperty;
- }
-
- return base.GetProperty<T>();
- }
-
- #region IInputChannel Members
-
- /// <summary>
- /// Returns the message received, if one is available. If a message is not available, blocks for a default interval of time.
+ /// Begins an asynchronous operation to receive a message that has a state object associated with it.
/// </summary>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
/// <returns>
- /// The <see cref="T:System.ServiceModel.Channels.Message" /> received.
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous message reception.
/// </returns>
- public Message Receive()
+ public IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
- return Receive(DefaultReceiveTimeout);
+ return BeginReceive(DefaultReceiveTimeout, callback, state);
}
/// <summary>
- /// Returns the message received, if one is available. If a message is not available, blocks for a specified interval of time.
+ /// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
/// <returns>
- /// The <see cref="T:System.ServiceModel.Channels.Message" /> received.
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous receive operation.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the receive operation has to complete before timing out and throwing a <see cref="T:System.TimeoutException" />.</param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
- public Message Receive(TimeSpan timeout)
+ public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
- Message message;
- if(TryReceive(timeout, out message))
- {
- return message;
- }
- throw new TimeoutException(String.Format("Receive timed out after {0}. The time allotted to this operation may have been a portion of a longer timeout.", timeout));
+ return BeginDequeue(timeout, callback, state);
}
/// <summary>
- /// Tries to receive a message within a specified interval of time.
+ /// Completes an asynchronous operation to receive a message.
/// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> returned by a call to one of the <c>System.ServiceModel.Channels.IInputChannel.BeginReceive</c> methods.</param>
/// <returns>
- /// true if a message is received before the <paramref name="timeout" /> has been exceeded; otherwise false.
+ /// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.IAsyncResult" /> returned by a call to one of the <see cref="System.ServiceModel.Channels.IInputChannel.BeginReceive(AsyncCallback, object)" /> methods.</param>
- /// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message" /> received. </param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
- /// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
- public bool TryReceive(TimeSpan timeout, out Message message)
+ public Message EndReceive(IAsyncResult result)
{
- NmsChannelHelper.ValidateTimeout(timeout);
- return _messages.Dequeue(timeout, out message);
+ return EndDequeue(result);
}
/// <summary>
- /// Begins an asynchronous operation to receive a message that has a state object associated with it.
+ /// Returns the message received, if one is available. If a message is not available, blocks for a default interval of time.
/// </summary>
/// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous message reception.
+ /// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
/// </returns>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
- public IAsyncResult BeginReceive(AsyncCallback callback, object state)
+ public Message Receive()
{
- return BeginReceive(DefaultReceiveTimeout, callback, state);
+ return Receive(DefaultReceiveTimeout);
}
/// <summary>
- /// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
+ /// Returns the message received, if one is available. If a message is not available, blocks for a specified interval of time.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the receive operation has to complete before timing out and throwing a <see cref="T:System.TimeoutException"/>.</param>
/// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous receive operation.
+ /// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
- public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
+ public Message Receive(TimeSpan timeout)
{
- return BeginTryReceive(timeout, callback, state);
+ return this.Dequeue(timeout);
}
- /// <summary>
- /// Completes an asynchronous operation to receive a message.
- /// </summary>
- /// <returns>
- /// The <see cref="T:System.ServiceModel.Channels.Message" /> received.
- /// </returns>
- /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to one of the <see cref="System.ServiceModel.Channels.IInputChannel.BeginReceive(AsyncCallback, object)" /> methods.</param>
- public Message EndReceive(IAsyncResult result)
- {
- return _messages.EndDequeue(result);
- }
+ #endregion
+
+ #region TryReceive
/// <summary>
- /// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
+ /// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
/// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous receive operation.
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous receive operation.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
- NmsChannelHelper.ValidateTimeout(timeout);
- return _messages.BeginDequeue(timeout, callback, state);
+ return BeginDequeue(timeout, callback, state);
}
/// <summary>
/// Completes the specified asynchronous operation to receive a message.
/// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> returned by a call to the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginTryReceive(System.TimeSpan,System.AsyncCallback,System.Object)"/> method.</param>
+ /// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message"/> received.</param>
/// <returns>
/// true if a message is received before the specified interval of time elapses; otherwise false.
/// </returns>
- /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginTryReceive(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
- /// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message" /> received. </param>
public bool EndTryReceive(IAsyncResult result, out Message message)
{
- return _messages.EndDequeue(result, out message);
+ message = null;
+ return TryDequeue(result, out message);
}
/// <summary>
- /// Returns a value that indicates whether a message has arrived within a specified interval of time.
+ /// Tries to receive a message within a specified interval of time.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.IAsyncResult"/> returned by a call to one of the <c>System.ServiceModel.Channels.IInputChannel.BeginReceive</c> methods.</param>
+ /// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message"/> received.</param>
/// <returns>
- /// true if a message has arrived before the <paramref name="timeout" /> has been exceeded; otherwise false.
+ /// true if a message is received before the <paramref name="timeout"/> has been exceeded; otherwise false.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> specifies the maximum interval of time to wait for a message to arrive before timing out.</param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
- public bool WaitForMessage(TimeSpan timeout)
+ public bool TryReceive(TimeSpan timeout, out Message message)
{
- NmsChannelHelper.ValidateTimeout(timeout);
- return _messages.WaitForItem(timeout);
+ message = Receive(timeout);
+ return true;
}
+ #endregion
+
+ #region WaitForMessage
+
/// <summary>
- /// Begins an asynchronous wait-for-a-message-to-arrive operation that has a specified time out and state object associated with it.
+ /// Begins an asynchronous wait-for-a-message-to-arrive operation that has a specified time out and state object associated with it.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
/// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous operation to wait for a message to arrive.
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous operation to wait for a message to arrive.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
- /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
{
- NmsChannelHelper.ValidateTimeout(timeout);
- return _messages.BeginWaitForItem(timeout, callback, state);
+ throw new NotImplementedException();
}
/// <summary>
/// Completes the specified asynchronous wait-for-a-message operation.
/// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> that identifies the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginWaitForMessage(System.TimeSpan,System.AsyncCallback,System.Object)"/> operation to finish, and from which to retrieve an end result.</param>
/// <returns>
- /// true if a message has arrived before the timeout has been exceeded; otherwise false.
+ /// true if a message has arrived before the <paramref name="timeout"/> has been exceeded; otherwise false.
/// </returns>
- /// <param name="result">The <see cref="T:System.IAsyncResult" /> that identifies the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginWaitForMessage(System.TimeSpan,System.AsyncCallback,System.Object)" /> operation to finish, and from which to retrieve an end result.</param>
public bool EndWaitForMessage(IAsyncResult result)
{
- return _messages.EndWaitForItem(result);
+ throw new NotImplementedException();
}
/// <summary>
- /// Gets the address on which the input channel receives messages.
- /// </summary>
- /// <returns>
- /// The <see cref="T:System.ServiceModel.EndpointAddress" /> on which the input channel receives messages.
- /// </returns>
- public EndpointAddress LocalAddress
- {
- get { return _localAddress; }
- }
-
- #endregion
-
- /// <summary>
- /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
- /// </summary>
- protected override void OnAbort()
- {
- OnClose(TimeSpan.Zero);
- }
-
- /// <summary>
- /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
- /// </summary>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
- /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
- protected override void OnClose(TimeSpan timeout)
- {
- NmsChannelHelper.ValidateTimeout(timeout);
- _messages.Close();
- }
-
- /// <summary>
- /// Completes an asynchronous operation on the close of a communication object.
- /// </summary>
- /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
- protected override void OnEndClose(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
-
- /// <summary>
- /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
- /// </summary>
- /// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation.
- /// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
- /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
- protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
- {
- OnClose(timeout);
- return new CompletedAsyncResult(callback, state);
- }
-
- /// <summary>
- /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
- /// </summary>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
- /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
- /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
- protected override void OnOpen(TimeSpan timeout)
- {
- }
-
- /// <summary>
- /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+ /// Returns a value that indicates whether a message has arrived within a specified interval of time.
/// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> specifies the maximum interval of time to wait for a message to arrive before timing out.</param>
/// <returns>
- /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation.
+ /// true if a message has arrived before the <paramref name="timeout"/> has been exceeded; otherwise false.
/// </returns>
- /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
- /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
- /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
- /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
- protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new CompletedAsyncResult(callback, state);
- }
-
- /// <summary>
- /// Completes an asynchronous operation on the open of a communication object.
- /// </summary>
- /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
- /// <exception cref="T:System.TimeoutException">The interval of time specified by the timeout that was allotted for the operation was exceeded before the operation was completed.</exception>
- protected override void OnEndOpen(IAsyncResult result)
+ /// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
+ public bool WaitForMessage(TimeSpan timeout)
{
- CompletedAsyncResult.End(result);
+ throw new NotImplementedException();
}
- #region Private members
-
- private readonly InputQueue<Message> _messages;
- private EndpointAddress _localAddress;
-
#endregion
}
-}
\ No newline at end of file
+}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Server-side listener for sessionless input channels.
+ /// </summary>
+ public class NmsInputChannelListener : ChannelListenerBase<IInputChannel>
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsInputChannelListener"/> class.
+ /// </summary>
+ /// <param name="transportElement">The binding element.</param>
+ /// <param name="context">The context.</param>
+ internal NmsInputChannelListener(NmsTransportBindingElement transportElement, BindingContext context)
+ : base(context.Binding)
+ {
+ _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize);
+
+ MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
+ _messageEncoderFactory = (messageEncoderBindingElement != null)
+ ? messageEncoderBindingElement.CreateMessageEncoderFactory()
+ : NmsConstants.DefaultMessageEncoderFactory;
+
+ _channelQueue = new InputQueue<IInputChannel>();
+ _currentChannelLock = new object();
+ _destinationName = transportElement.Destination;
+ _destinationType = transportElement.DestinationType;
+ _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
+ Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
+ }
+
+ #endregion
+
+ #region Public properties
+
+ /// <summary>
+ /// Gets the message encoder factory.
+ /// </summary>
+ /// <value>The message encoder factory.</value>
+ public MessageEncoderFactory MessageEncoderFactory
+ {
+ get { return _messageEncoderFactory; }
+ }
+
+ /// <summary>
+ /// Gets or sets the destination.
+ /// </summary>
+ /// <value>The destination.</value>
+ public string Destination
+ {
+ get { return _destinationName; }
+ set { _destinationName = value; }
+ }
+
+ /// <summary>
+ /// Gets or sets the type of the destination.
+ /// </summary>
+ /// <value>The type of the destination.</value>
+ public DestinationType DestinationType
+ {
+ get { return _destinationType; }
+ set { _destinationType = value; }
+ }
+
+ #endregion
+
+ #region Implementation of CommunicationObject
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state
+ /// due to the invocation of a synchronous abort operation.
+ /// </summary>
+ /// <remarks>
+ /// Abort can be called at any time, so we can't assume that we've been Opened successfully
+ /// (and thus may not have any listen sockets).
+ /// </remarks>
+ protected override void OnAbort()
+ {
+ OnClose(TimeSpan.Zero);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override void OnClose(TimeSpan timeout)
+ {
+ lock(ThisLock)
+ {
+ if(_consumer != null)
+ {
+ Tracer.Debug("Listener is terminating consumer...");
+ _consumer.Close();
+ _consumer.Dispose();
+ Tracer.Debug("Listener has terminated consumer");
+ }
+
+ if(_session != null)
+ {
+ Tracer.Debug("Listener is terminating session...");
+ _session.Close();
+ Tracer.Debug("Listener has terminated session");
+ }
+
+ if(_connection != null)
+ {
+ Tracer.Debug("Listener is terminating connection...");
+ _connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
+ Tracer.Debug("Listener has terminated connection");
+ }
+
+ _channelQueue.Close();
+ }
+ }
+
+ /// <summary>
+ /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnClose(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the close of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
+ protected override void OnEndClose(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ if(Uri == null)
+ {
+ throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
+ }
+ NmsChannelHelper.ValidateTimeout(timeout);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ OnOpen(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the open of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region Implementation of ChannelListenerBase
+
+ /// <summary>
+ /// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels.
+ /// </returns>
+ public override Uri Uri
+ {
+ get { return _uri; }
+ }
+
+ /// <summary>
+ /// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param>
+ protected override bool OnWaitForChannel(TimeSpan timeout)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ return _channelQueue.WaitForItem(timeout);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param>
+ protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ return _channelQueue.BeginWaitForItem(timeout, callback, state);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// true if the method completed before the timeout expired; otherwise false.
+ /// </returns>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+ protected override bool OnEndWaitForChannel(IAsyncResult result)
+ {
+ return _channelQueue.EndWaitForItem(result);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an extensibility point when accepting a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+ protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
+ {
+ Tracer.Debug("Accepting channel");
+ NmsChannelHelper.ValidateTimeout(timeout);
+ if(!IsDisposed)
+ {
+ EnsureChannelAvailable();
+ }
+
+ IInputChannel channel;
+ if(_channelQueue.Dequeue(timeout, out channel))
+ {
+ return channel;
+ }
+ throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param>
+ protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ if(!IsDisposed)
+ {
+ EnsureChannelAvailable();
+ }
+ return _channelQueue.BeginDequeue(timeout, callback, state);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener.
+ /// </returns>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+ protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
+ {
+ IInputChannel channel;
+ if(_channelQueue.EndDequeue(result, out channel))
+ {
+ return channel;
+ }
+ throw new TimeoutException();
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Dispatches the callback.
+ /// </summary>
+ /// <param name="state">The state.</param>
+ internal void DispatchCallback(object state)
+ {
+ Dispatch((Message) state);
+ }
+
+ /// <summary>
+ /// Matches an incoming message to its waiting listener,
+ /// using the FilterTable to dispatch the message to the correct
+ /// listener. If no listener is waiting for the message, it is silently
+ /// discarded.
+ /// </summary>
+ internal void Dispatch(Message message)
+ {
+ if(message == null)
+ {
+ return;
+ }
+
+ try
+ {
+ NmsInputChannel newChannel;
+ bool channelCreated = CreateOrRetrieveChannel(out newChannel);
+
+ Tracer.Debug("Dispatching incoming message");
+ newChannel.Dispatch(message);
+
+ if(channelCreated)
+ {
+ //Hand the channel off to whomever is waiting for AcceptChannel() to complete
+ Tracer.Debug("Handing off channel");
+ _channelQueue.EnqueueAndDispatch(newChannel);
+ }
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString());
+ }
+ }
+
+ /// <summary>
+ /// Creates or retrieves the channel.
+ /// </summary>
+ /// <param name="newChannel">The channel.</param>
+ private bool CreateOrRetrieveChannel(out NmsInputChannel newChannel)
+ {
+ bool channelCreated = false;
+
+ if((newChannel = _currentChannel) == null)
+ {
+ lock(_currentChannelLock)
+ {
+ if((newChannel = _currentChannel) == null)
+ {
+ newChannel = CreateNmsChannel(Uri);
+ newChannel.Closed += OnChannelClosed;
+ _currentChannel = newChannel;
+ channelCreated = true;
+ }
+ }
+ }
+
+ return channelCreated;
+ }
+
+ /// <summary>
+ /// Called when the channel is closed.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param>
+ private void OnChannelClosed(object sender, EventArgs args)
+ {
+ NmsInputChannel channel = (NmsInputChannel) sender;
+
+ lock(_currentChannelLock)
+ {
+ if(channel == _currentChannel)
+ {
+ _currentChannel = null;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages.
+ /// </summary>
+ /// <param name="uri">The URI for the message queue.</param>
+ private NmsInputChannel CreateNmsChannel(Uri uri)
+ {
+ _connection = OpenConnection(uri);
+ _session = OpenSession(_connection);
+ _destination = SessionUtil.GetDestination(_session, Destination, DestinationType);
+ _consumer = CreateConsumer(_session, _destination);
+
+ EndpointAddress address = new EndpointAddress(uri);
+ return new NmsInputChannel(this, address);
+ }
+
+ /// <summary>
+ /// Opens the connection to the message broker.
+ /// </summary>
+ /// <param name="uri">The URI.</param>
+ /// <returns>An active connection to the ActiveMQ message broker specified by the URI;
+ /// exceptions will be caught by the attached ExceptionListener.</returns>
+ private IConnection OpenConnection(Uri uri)
+ {
+ IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+ connection.ExceptionListener += OnExceptionThrown;
+ connection.Start();
+ Tracer.Debug("Connection open");
+ return connection;
+ }
+
+ /// <summary>
+ /// Opens a session to communicate with a message queue.
+ /// </summary>
+ /// <param name="connection">The connection to the ActiveMQ message broker.</param>
+ /// <returns>A session.</returns>
+ /// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet
+ /// been started.</exception>
+ private ISession OpenSession(IConnection connection)
+ {
+ if(!connection.IsStarted)
+ {
+ throw new InvalidOperationException("The connection has not yet been opened");
+ }
+
+ Tracer.Debug("Opening session...");
+ ISession session = connection.CreateSession();
+ Tracer.Debug("Session open");
+ return session;
+ }
+
+ /// <summary>
+ /// Creates the consumer of messages received on the <paramref name="session"/>.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="destination">The destination.</param>
+ /// <returns>A consumer for any messages received during the session;
+ /// messages will be consumed by the attached Listener.</returns>
+ private IMessageConsumer CreateConsumer(ISession session, IDestination destination)
+ {
+ Tracer.Debug("Creating message listener...");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ consumer.Listener += OnReceiveMessage;
+ Tracer.Debug("Created message listener");
+ return consumer;
+ }
+
+ /// <summary>
+ /// Event handler that processes a received message.
+ /// </summary>
+ /// <param name="message">The message.</param>
+ private void OnReceiveMessage(IMessage message)
+ {
+ Tracer.Debug("Decoding message");
+ string soapMsg = ((ITextMessage) message).Text;
+ byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+ int dataLength = buffer.Length;
+ byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+ Array.Copy(buffer, data1, dataLength);
+
+ ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength);
+
+ byte[] msgContents = new byte[data.Count];
+ Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
+ Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+ Tracer.Debug(msg);
+ Dispatch(msg);
+ }
+
+ /// <summary>
+ /// Called when an exception is thrown by the ActiveMQ listener.
+ /// </summary>
+ /// <remarks>
+ /// <see cref="NMSException" />s will be caught and logged; all other exceptions will
+ /// be thrown back up to the WCF service.
+ /// </remarks>
+ /// <param name="exception">The exception that was thrown.</param>
+ private void OnExceptionThrown(Exception exception)
+ {
+ if(exception is NMSException)
+ {
+ Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
+ exception.GetType().Name,
+ exception.Message,
+ exception.StackTrace);
+ return;
+ }
+
+ // TODO: can we recover from the exception? Do we convert to WCF exceptions?
+ throw exception;
+ }
+
+ /// <summary>
+ /// Guarantees that a channel is attached to this listener.
+ /// </summary>
+ private void EnsureChannelAvailable()
+ {
+ NmsInputChannel newChannel;
+ if(CreateOrRetrieveChannel(out newChannel))
+ {
+ _channelQueue.EnqueueAndDispatch(newChannel);
+ }
+ }
+
+ #region Private members
+
+ private readonly Uri _uri;
+ private IConnection _connection;
+ private ISession _session;
+ private IDestination _destination;
+ private IMessageConsumer _consumer;
+ private readonly InputQueue<IInputChannel> _channelQueue;
+ private NmsInputChannel _currentChannel;
+ private readonly object _currentChannelLock;
+ private readonly MessageEncoderFactory _messageEncoderFactory;
+ private readonly BufferManager _bufferManager;
+ private string _destinationName;
+ private DestinationType _destinationType;
+
+ #endregion
+ }
+}