You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/12/12 19:25:52 UTC

svn commit: r726083 [2/2] - in /activemq/activemq-dotnet/Apache.NMS.WCF/trunk: ./ src/main/csharp/ src/main/csharp/Configuration/

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Base class for NMS input channels.
+	/// </summary>
+	/// <typeparam name="T"></typeparam>
+	public abstract class NmsInputQueueChannelBase<T> : ChannelBase where T : class
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsInputQueueChannelBase&lt;T&gt;"/> class.
+		/// </summary>
+		/// <param name="factory">The factory that was used to create the channel.</param>
+		/// <param name="localAddress">The local address of the channel.</param>
+		public NmsInputQueueChannelBase(ChannelListenerBase factory, EndpointAddress localAddress)
+			: base(factory)
+		{
+			_localAddress = localAddress;
+			_messageQueue = new InputQueue<T>();
+		}
+
+		#endregion
+
+		#region Public properties
+
+		/// <summary>
+		/// Gets the local address.
+		/// </summary>
+		/// <value>The local address.</value>
+		public EndpointAddress LocalAddress
+		{
+			get { return _localAddress; }
+		}
+
+		#endregion
+
+		#region Messaging
+
+		/// <summary>
+		/// Gets the pending message count.
+		/// </summary>
+		/// <value>The pending message count.</value>
+		public int PendingMessageCount
+		{
+			get
+			{
+				return _messageQueue.PendingCount;
+			}
+		}
+
+		/// <summary>
+		/// Dispatches the specified request.
+		/// </summary>
+		/// <param name="request">The request.</param>
+		public void Dispatch(T request)
+		{
+			ThrowIfDisposedOrNotOpen();
+			_messageQueue.EnqueueAndDispatch(request);
+		}
+
+		/// <summary>
+		/// Begins the dequeue operation.
+		/// </summary>
+		/// <param name="timeout">The timeout.</param>
+		/// <param name="callback">The callback.</param>
+		/// <param name="state">The state.</param>
+		public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			return (State == CommunicationState.Opened)
+				? _messageQueue.BeginDequeue(timeout, callback, state)
+				: new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Ends the dequeue operation.
+		/// </summary>
+		/// <param name="result">The result.</param>
+		/// <returns></returns>
+		public T EndDequeue(IAsyncResult result)
+		{
+			ThrowIfDisposedOrNotOpen();
+			return _messageQueue.EndDequeue(result);
+		}
+
+		/// <summary>
+		/// Dequeues the next message.
+		/// </summary>
+		/// <param name="timeout">The timeout.</param>
+		public T Dequeue(TimeSpan timeout)
+		{
+			ThrowIfDisposedOrNotOpen();
+			return _messageQueue.Dequeue(timeout);
+		}
+
+		/// <summary>
+		/// Tries to dequeue the next message.
+		/// </summary>
+		/// <param name="result">The result.</param>
+		/// <param name="message">The message.</param>
+		/// <returns></returns>
+		public bool TryDequeue(IAsyncResult result, out T message)
+		{
+			message = null;
+			TypedAsyncResult<T> completedResult = result as TypedAsyncResult<T>;
+			if(completedResult != null)
+			{
+				message = TypedAsyncResult<T>.End(result);
+			}
+			else if(result.CompletedSynchronously == false)
+			{
+				InputQueue<T>.AsyncQueueReader completedResult2 = result as InputQueue<T>.AsyncQueueReader;
+				InputQueue<T>.AsyncQueueReader.End(result, out message);
+			}
+			return result.IsCompleted;
+		}
+
+		#endregion
+
+		#region Abort
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
+		/// </summary>
+		protected override void OnAbort()
+		{
+			_messageQueue.Close();
+		}
+
+		#endregion
+
+		#region Open
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation.
+		/// </returns>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnOpen(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+			_messageQueue.Open();
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region Close
+
+		/// <summary>
+		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation.
+		/// </returns>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnClose(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override void OnClose(TimeSpan timeout)
+		{
+			_messageQueue.Close();
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the close of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnEndClose(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region GetProperty
+
+		/// <summary>
+		/// Gets the property.
+		/// </summary>
+		/// <typeparam name="P"></typeparam>
+		public override P GetProperty<P>()
+		{
+			if(typeof(P) == typeof(FaultConverter))
+			{
+				return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as P;
+			}
+			return base.GetProperty<P>();
+		}
+
+		#endregion
+
+		#region Private members
+
+		private EndpointAddress _localAddress;
+		private InputQueue<T> _messageQueue;
+
+		#endregion
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Server-side implementation of the sessioned one-way channel.
+	/// </summary>
+	public class NmsInputSessionChannel : NmsInputChannel, IInputSessionChannel
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsInputSessionChannel"/> class.
+		/// </summary>
+		/// <param name="factory">The factory that was used to create the channel.</param>
+		/// <param name="localAddress">The local address of the channel.</param>
+		internal NmsInputSessionChannel(ChannelListenerBase factory, EndpointAddress localAddress)
+			: base(factory, localAddress)
+		{
+		}
+
+		#endregion
+
+		#region ISessionChannel<IInputSession> Members
+
+		/// <summary>
+		/// Gets the type of session associated with this channel.
+		/// </summary>
+		/// <value></value>
+		/// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns>
+		public IInputSession Session
+		{
+			get { return _session; }
+		}
+
+		/// <summary>
+		/// Internal implementation of a session, with tracking ID.
+		/// </summary>
+		private class InputSession : IInputSession, System.ServiceModel.Channels.ISession
+		{
+			private string _sessionId = NmsChannelHelper.CreateUniqueSessionId();
+
+			/// <summary>
+			/// Gets the ID that uniquely identifies the session.
+			/// </summary>
+			/// <value></value>
+			/// <returns>The ID that uniquely identifies the session. </returns>
+			public string Id
+			{
+				get { return _sessionId; }
+			}
+		}
+
+		#endregion
+
+		#region Private members
+
+		private IInputSession _session = new InputSession();
+
+		#endregion
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Server-side listener for sessioned input channels.
+	/// </summary>
+	public class NmsInputSessionChannelListener : ChannelListenerBase<IInputSessionChannel>, IChannel
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsInputSessionChannelListener"/> class.
+		/// </summary>
+		/// <param name="transportElement">The binding element.</param>
+		/// <param name="context">The context.</param>
+		internal NmsInputSessionChannelListener(NmsTransportBindingElement transportElement, BindingContext context)
+			: base(context.Binding)
+		{
+			_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize);
+
+			MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
+			_messageEncoderFactory = (messageEncoderBindingElement != null)
+				? messageEncoderBindingElement.CreateMessageEncoderFactory()
+				: NmsConstants.DefaultMessageEncoderFactory;
+
+			_channelQueue = new InputQueue<IInputSessionChannel>();
+			_currentChannelLock = new object();
+			_destinationName = transportElement.Destination;
+			_destinationType = transportElement.DestinationType;
+			_uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
+			Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
+		}
+
+		#endregion
+
+		#region Public properties
+
+		/// <summary>
+		/// Gets the message encoder factory.
+		/// </summary>
+		/// <value>The message encoder factory.</value>
+		public MessageEncoderFactory MessageEncoderFactory
+		{
+			get { return _messageEncoderFactory; }
+		}
+
+		/// <summary>
+		/// Gets or sets the destination.
+		/// </summary>
+		/// <value>The destination.</value>
+		public string Destination
+		{
+			get { return _destinationName; }
+			set { _destinationName = value; }
+		}
+
+		/// <summary>
+		/// Gets or sets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+			set { _destinationType = value; }
+		}
+
+		#endregion
+
+		#region Implementation of CommunicationObject
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state 
+		/// due to the invocation of a synchronous abort operation.
+		/// </summary>
+		/// <remarks>
+		/// Abort can be called at any time, so we can't assume that we've been Opened successfully 
+		/// (and thus may not have any listen sockets).
+		/// </remarks>
+		protected override void OnAbort()
+		{
+			OnClose(TimeSpan.Zero);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override void OnClose(TimeSpan timeout)
+		{
+			lock(ThisLock)
+			{
+				if(_consumer != null)
+				{
+					Tracer.Debug("Listener is terminating consumer...");
+					_consumer.Close();
+					_consumer.Dispose();
+					Tracer.Debug("Listener has terminated consumer");
+				}
+
+				if(_session != null)
+				{
+					Tracer.Debug("Listener is terminating session...");
+					_session.Close();
+					Tracer.Debug("Listener has terminated session");
+				}
+
+				if(_connection != null)
+				{
+					Tracer.Debug("Listener is terminating connection...");
+					_connection.Stop();
+					_connection.Close();
+					_connection.Dispose();
+					Tracer.Debug("Listener has terminated connection");
+				}
+
+				_channelQueue.Close();
+			}
+		}
+
+		/// <summary>
+		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnClose(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the close of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndClose(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+			if(Uri == null)
+			{
+				throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
+			}
+			NmsChannelHelper.ValidateTimeout(timeout);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			OnOpen(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region Implementation of ChannelListenerBase
+
+		/// <summary>
+		/// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels.
+		/// </returns>
+		public override Uri Uri
+		{
+			get { return _uri; }
+		}
+
+		/// <summary>
+		/// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param>
+		protected override bool OnWaitForChannel(TimeSpan timeout)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.WaitForItem(timeout);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param>
+		protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.BeginWaitForItem(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the timeout expired; otherwise false.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override bool OnEndWaitForChannel(IAsyncResult result)
+		{
+			return _channelQueue.EndWaitForItem(result);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an extensibility point when accepting a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		protected override IInputSessionChannel OnAcceptChannel(TimeSpan timeout)
+		{
+			Tracer.Debug("Accepting channel");
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if(!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+
+			IInputSessionChannel channel;
+			if(_channelQueue.Dequeue(timeout, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param>
+		protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if(!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+			return _channelQueue.BeginDequeue(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override IInputSessionChannel OnEndAcceptChannel(IAsyncResult result)
+		{
+			IInputSessionChannel channel;
+			if(_channelQueue.EndDequeue(result, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException();
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Dispatches the callback.
+		/// </summary>
+		/// <param name="state">The state.</param>
+		internal void DispatchCallback(object state)
+		{
+			Dispatch((Message) state);
+		}
+
+		/// <summary>
+		/// Matches an incoming message to its waiting listener,
+		/// using the FilterTable to dispatch the message to the correct
+		/// listener. If no listener is waiting for the message, it is silently
+		/// discarded.
+		/// </summary>
+		internal void Dispatch(Message message)
+		{
+			if(message == null)
+			{
+				return;
+			}
+
+			try
+			{
+				NmsInputSessionChannel newChannel;
+				bool channelCreated = CreateOrRetrieveChannel(out newChannel);
+
+				Tracer.Debug("Dispatching incoming message");
+				newChannel.Dispatch(message);
+
+				if(channelCreated)
+				{
+					//Hand the channel off to whomever is waiting for AcceptChannel() to complete
+					Tracer.Debug("Handing off channel");
+					_channelQueue.EnqueueAndDispatch(newChannel);
+				}
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString());
+			}
+		}
+
+		/// <summary>
+		/// Creates or retrieves the channel.
+		/// </summary>
+		/// <param name="newChannel">The channel.</param>
+		private bool CreateOrRetrieveChannel(out NmsInputSessionChannel newChannel)
+		{
+			bool channelCreated = false;
+
+			if((newChannel = _currentChannel) == null)
+			{
+				lock(_currentChannelLock)
+				{
+					if((newChannel = _currentChannel) == null)
+					{
+						newChannel = CreateNmsChannel(Uri);
+						newChannel.Closed += OnChannelClosed;
+						_currentChannel = newChannel;
+						channelCreated = true;
+					}
+				}
+			}
+
+			return channelCreated;
+		}
+
+		/// <summary>
+		/// Called when the channel is closed.
+		/// </summary>
+		/// <param name="sender">The sender.</param>
+		/// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param>
+		private void OnChannelClosed(object sender, EventArgs args)
+		{
+			NmsInputSessionChannel channel = (NmsInputSessionChannel) sender;
+
+			lock(_currentChannelLock)
+			{
+				if(channel == _currentChannel)
+				{
+					_currentChannel = null;
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages.
+		/// </summary>
+		/// <param name="uri">The URI for the message queue.</param>
+		private NmsInputSessionChannel CreateNmsChannel(Uri uri)
+		{
+			_connection = OpenConnection(uri);
+			_session = OpenSession(_connection);
+			_destination = SessionUtil.GetDestination(_session, Destination, DestinationType);
+			_consumer = CreateConsumer(_session, _destination);
+
+			EndpointAddress address = new EndpointAddress(uri);
+			return new NmsInputSessionChannel(this, address);
+		}
+
+		/// <summary>
+		/// Opens the connection to the message broker.
+		/// </summary>
+		/// <param name="uri">The URI.</param>
+		/// <returns>An active connection to the ActiveMQ message broker specified by the URI;
+		/// exceptions will be caught by the attached ExceptionListener.</returns>
+		private IConnection OpenConnection(Uri uri)
+		{
+			IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+			connection.ExceptionListener += OnExceptionThrown;
+			connection.Start();
+			Tracer.Debug("Connection open");
+			return connection;
+		}
+
+		/// <summary>
+		/// Opens a session to communicate with a message queue.
+		/// </summary>
+		/// <param name="connection">The connection to the ActiveMQ message broker.</param>
+		/// <returns>A session.</returns>
+		/// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet
+		/// been started.</exception>
+		private ISession OpenSession(IConnection connection)
+		{
+			if(!connection.IsStarted)
+			{
+				throw new InvalidOperationException("The connection has not yet been opened");
+			}
+
+			Tracer.Debug("Opening session...");
+			ISession session = connection.CreateSession();
+			Tracer.Debug("Session open");
+			return session;
+		}
+
+		/// <summary>
+		/// Creates the consumer of messages received on the <paramref name="session"/>.
+		/// </summary>
+		/// <param name="session">The session.</param>
+		/// <param name="destination">The destination.</param>
+		/// <returns>A consumer for any messages received during the session;
+		/// messages will be consumed by the attached Listener.</returns>
+		private IMessageConsumer CreateConsumer(ISession session, IDestination destination)
+		{
+			Tracer.Debug("Creating message listener...");
+			IMessageConsumer consumer = session.CreateConsumer(destination);
+			consumer.Listener += OnReceiveMessage;
+			Tracer.Debug("Created message listener");
+			return consumer;
+		}
+
+		/// <summary>
+		/// Event handler that processes a received message.
+		/// </summary>
+		/// <param name="message">The message.</param>
+		private void OnReceiveMessage(IMessage message)
+		{
+			Tracer.Debug("Decoding message");
+			string soapMsg = ((ITextMessage) message).Text;
+			byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+			int dataLength = buffer.Length;
+			byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+			Array.Copy(buffer, data1, dataLength);
+
+			ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength);
+
+			byte[] msgContents = new byte[data.Count];
+			Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
+			Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+			Tracer.Debug(msg);
+			Dispatch(msg);
+		}
+
+		/// <summary>
+		/// Called when an exception is thrown by the ActiveMQ listener.
+		/// </summary>
+		/// <remarks>
+		/// <see cref="NMSException" />s will be caught and logged; all other exceptions will
+		/// be thrown back up to the WCF service.
+		/// </remarks>
+		/// <param name="exception">The exception that was thrown.</param>
+		private void OnExceptionThrown(Exception exception)
+		{
+			if(exception is NMSException)
+			{
+				Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
+					exception.GetType().Name,
+					exception.Message,
+					exception.StackTrace);
+				return;
+			}
+
+			// TODO: can we recover from the exception? Do we convert to WCF exceptions?
+			throw exception;
+		}
+
+		/// <summary>
+		/// Guarantees that a channel is attached to this listener.
+		/// </summary>
+		private void EnsureChannelAvailable()
+		{
+			NmsInputSessionChannel newChannel;
+			if(CreateOrRetrieveChannel(out newChannel))
+			{
+				_channelQueue.EnqueueAndDispatch(newChannel);
+			}
+		}
+
+		#region Private members
+
+		private readonly Uri _uri;
+		private IConnection _connection;
+		private ISession _session;
+		private IDestination _destination;
+		private IMessageConsumer _consumer;
+		private readonly InputQueue<IInputSessionChannel> _channelQueue;
+		private NmsInputSessionChannel _currentChannel;
+		private readonly object _currentChannelLock;
+		private readonly MessageEncoderFactory _messageEncoderFactory;
+		private readonly BufferManager _bufferManager;
+		private string _destinationName;
+		private DestinationType _destinationType;
+
+		#endregion
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs Fri Dec 12 10:25:52 2008
@@ -20,28 +20,30 @@
 using System.ServiceModel.Channels;
 using System.Text;
 using System.Xml;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.WCF
 {
 	/// <summary>
-	/// Channel for sending messages.
+	/// Client-side implementation of the sessionless one-way channel.
 	/// </summary>
-	public class NmsOutputChannel : NmsChannelBase, IOutputChannel
+	public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel
 	{
 		#region Constructors
 
 		/// <summary>
 		/// Initializes a new instance of the <see cref="NmsOutputChannel"/> class.
 		/// </summary>
+		/// <param name="factory">The factory that created the channel.</param>
+		/// <param name="remoteAddress">The remote address of the channel.</param>
+		/// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
 		/// <param name="bufferManager">The buffer manager.</param>
 		/// <param name="encoderFactory">The encoder factory.</param>
-		/// <param name="address">The address.</param>
-		/// <param name="parent">The parent.</param>
-		/// <param name="via">The via.</param>
-		public NmsOutputChannel(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, NmsChannelFactory parent, Uri via)
-			: base(bufferManager, encoderFactory, address, parent, parent.Destination, parent.DestinationType)
+		/// <param name="destination">The name of the ActiveMQ destination.</param>
+		/// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+		public NmsOutputChannel(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+			: base(factory, remoteAddress, via, bufferManager, encoderFactory, destination, destinationType)
 		{
-			_via = via;
 			_connection = ConnectionFactoryManager.GetInstance().CreateConnection(via);
 			_connection.Start();
 		}
@@ -67,16 +69,14 @@
 		public void Send(Message message, TimeSpan timeout)
 		{
 			ThrowIfDisposedOrNotOpen();
+			RemoteAddress.ApplyTo(message);
 
 			using(NMS.ISession session = _connection.CreateSession())
 			{
-				IDestination destination = NmsChannelHelper.GetDestination(session, Destination, DestinationType);
+				IDestination destination = SessionUtil.GetDestination(session, Destination, DestinationType);
 				using(IMessageProducer producer = session.CreateProducer(destination))
 				{
 					producer.Persistent = true;
-					message.Headers.To = RemoteAddress.Uri;
-					//TODO: check if this is synonymous with the above operation
-					//RemoteAddress.ApplyTo(message);
 
 					ITextMessage request = session.CreateTextMessage(TranslateMessage(message));
 					producer.Send(request);
@@ -94,7 +94,7 @@
 		/// <param name="message">The message to be translated.</param>
 		private string TranslateMessage(Message message)
 		{
-			return (Encoder.MessageVersion == MessageVersion.Soap11)
+			return (this.Encoder.MessageVersion == MessageVersion.Soap11)
 				? TranslateMessageAsSoap11(message)
 				: TranslateMessageAsSoap12(message);
 		}
@@ -169,17 +169,6 @@
 			NmsAsyncResult.End(result);
 		}
 
-		/// <summary>
-		/// Gets the URI that contains the transport address to which messages are sent on the output channel.
-		/// </summary>
-		/// <returns>
-		/// The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel.
-		/// </returns>
-		public Uri Via
-		{
-			get { return _via; }
-		}
-
 		#endregion
 
 		#region Implementation of CommunicationObject
@@ -307,9 +296,8 @@
 
 		#region Private members
 
-		private readonly Uri _via;
 		private readonly IConnection _connection;
 
 		#endregion
 	}
-}
\ No newline at end of file
+}

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Base class for NMS output channels.
+	/// </summary>
+	public abstract class NmsOutputChannelBase : ChannelBase
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsOutputChannelBase"/> class.
+		/// </summary>
+		/// <param name="factory">The factory that created the channel.</param>
+		/// <param name="remoteAddress">The remote address for the channel.</param>
+		/// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
+		/// <param name="bufferManager">The buffer manager.</param>
+		/// <param name="encoderFactory">The encoder factory.</param>
+		/// <param name="destination">The name of the ActiveMQ destination.</param>
+		/// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+		internal NmsOutputChannelBase(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+			: base(factory)
+		{
+			_remoteAddress = remoteAddress;
+			_via = via;
+			_bufferManager = bufferManager;
+			_encoder = encoderFactory;
+			_destination = destination;
+			_destinationType = destinationType;
+		}
+
+		#endregion
+
+		#region NullRequestContextCollection
+
+		//public NmsAsyncRequestContextCollection PendingRequests
+		//{
+		//    get { return _pendingRequests; }
+		//}
+
+		#endregion
+
+		#region Public properties
+
+		/// <summary>
+		/// Gets the remote address.
+		/// </summary>
+		/// <value>The remote address.</value>
+		public EndpointAddress RemoteAddress
+		{
+			get { return _remoteAddress; }
+		}
+
+		/// <summary>
+		/// Gets the routing address.
+		/// </summary>
+		/// <value>The routing address.</value>
+		public Uri Via
+		{
+			get { return _via; }
+		}
+
+		/// <summary>
+		/// Gets the buffer manager.
+		/// </summary>
+		/// <value>The buffer manager.</value>
+		public BufferManager BufferManager
+		{
+			get { return _bufferManager; }
+		}
+
+		/// <summary>
+		/// Gets the encoder.
+		/// </summary>
+		/// <value>The encoder.</value>
+		public MessageEncoder Encoder
+		{
+			get { return _encoder.Encoder; }
+		}
+
+		/// <summary>
+		/// Gets the name of the destination (either a queue or a topic).
+		/// </summary>
+		/// <value>The name of the destination.</value>
+		public string Destination
+		{
+			get { return _destination; }
+		}
+
+		/// <summary>
+		/// Gets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+		}
+
+		#endregion
+
+		#region Abort
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
+		/// </summary>
+		protected override void OnAbort()
+		{
+			//_pendingRequests.AbortAll();
+		}
+
+		#endregion
+
+		#region Close
+
+		/// <summary>
+		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation.
+		/// </returns>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnClose(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override void OnClose(TimeSpan timeout)
+		{
+			//_pendingRequests.AbortAll();
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the close of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnEndClose(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region Open
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation.
+		/// </returns>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnOpen(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">
+		/// 	<paramref name="timeout"/> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region GetProperty
+
+		/// <summary>
+		/// Gets the property.
+		/// </summary>
+		/// <typeparam name="T"></typeparam>
+		/// <returns></returns>
+		public override T GetProperty<T>()
+		{
+			if(typeof(T) == typeof(FaultConverter))
+			{
+				return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as T;
+			}
+			return base.GetProperty<T>();
+		}
+
+		#endregion
+
+		#region Private members
+
+		private EndpointAddress _remoteAddress;
+		private Uri _via;
+		private BufferManager _bufferManager;
+		private MessageEncoderFactory _encoder;
+		private string _destination;
+		private DestinationType _destinationType;
+
+		// for request/reply pattern
+		//NullAsyncRequestContextCollection _pendingRequests = new NullAsyncRequestContextCollection();
+
+		#endregion
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Client-side implementation of the sessioned one-way channel.
+	/// </summary>
+	internal class NmsOutputSessionChannel : NmsOutputChannel, IOutputSessionChannel
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsOutputSessionChannel"/> class.
+		/// </summary>
+		/// <param name="factory">The factory that created this channel.</param>
+		/// <param name="address">The address of this channel.</param>
+		/// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
+		/// <param name="bufferManager">The buffer manager.</param>
+		/// <param name="encoderFactory">The encoder factory.</param>
+		/// <param name="destination">The name of the ActiveMQ destination.</param>
+		/// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+		public NmsOutputSessionChannel(ChannelManagerBase factory, Uri via, EndpointAddress address, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+			: base(factory, address, via, bufferManager, encoderFactory, destination, destinationType)
+		{
+		}
+
+		#endregion
+
+		#region ISessionChannel<IOutputSession> Members
+
+		/// <summary>
+		/// Gets the type of session associated with this channel.
+		/// </summary>
+		/// <value></value>
+		/// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns>
+		public IOutputSession Session
+		{
+			get { return _session; }
+		}
+
+		/// <summary>
+		/// Internal implementation of a session, with tracking ID.
+		/// </summary>
+		private class OutputSession : IOutputSession, System.ServiceModel.Channels.ISession
+		{
+			private string _sessionId = NmsChannelHelper.CreateUniqueSessionId();
+
+			/// <summary>
+			/// Gets the ID that uniquely identifies the session.
+			/// </summary>
+			/// <value></value>
+			/// <returns>The ID that uniquely identifies the session. </returns>
+			public string Id
+			{
+				get { return _sessionId; }
+			}
+		}
+
+		#endregion
+
+		#region Private members
+
+		private IOutputSession _session = new OutputSession();
+
+		#endregion
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj Fri Dec 12 10:25:52 2008
@@ -66,12 +66,13 @@
     </NoWarn>
   </PropertyGroup>
   <ItemGroup>
-    <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, processorArchitecture=MSIL">
+    <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, PublicKeyToken=2a329723af30bc8d, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>lib\Apache.NMS\net-3.5\Apache.NMS.dll</HintPath>
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.configuration" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Runtime.Serialization">
       <RequiredTargetFramework>3.0</RequiredTargetFramework>
     </Reference>
@@ -121,12 +122,16 @@
     <Compile Include="src\main\csharp\ConnectionFactoryManager.cs" />
     <Compile Include="src\main\csharp\InputQueue.cs" />
     <Compile Include="src\main\csharp\NmsAsyncResult.cs" />
-    <Compile Include="src\main\csharp\NmsChannelBase.cs" />
+    <Compile Include="src\main\csharp\NmsInputQueueChannelBase.cs" />
     <Compile Include="src\main\csharp\NmsChannelFactory.cs" />
     <Compile Include="src\main\csharp\NmsChannelHelper.cs" />
-    <Compile Include="src\main\csharp\NmsChannelListener.cs" />
+    <Compile Include="src\main\csharp\NmsInputChannelListener.cs" />
     <Compile Include="src\main\csharp\NmsInputChannel.cs" />
+    <Compile Include="src\main\csharp\NmsInputSessionChannel.cs" />
+    <Compile Include="src\main\csharp\NmsInputSessionChannelListener.cs" />
     <Compile Include="src\main\csharp\NmsOutputChannel.cs" />
+    <Compile Include="src\main\csharp\NmsOutputChannelBase.cs" />
+    <Compile Include="src\main\csharp\NmsOutputSessionChannel.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets" />
   <PropertyGroup>
@@ -135,4 +140,4 @@
     <PostBuildEvent>cd $(ProjectDir)
 nant -nologo -q install-all -D:compile.skip=true</PostBuildEvent>
   </PropertyGroup>
-</Project>
\ No newline at end of file
+</Project>