You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/12/12 19:25:52 UTC
svn commit: r726083 [2/2] - in
/activemq/activemq-dotnet/Apache.NMS.WCF/trunk: ./ src/main/csharp/
src/main/csharp/Configuration/
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Base class for NMS input channels.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ public abstract class NmsInputQueueChannelBase<T> : ChannelBase where T : class
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsInputQueueChannelBase<T>"/> class.
+ /// </summary>
+ /// <param name="factory">The factory that was used to create the channel.</param>
+ /// <param name="localAddress">The local address of the channel.</param>
+ public NmsInputQueueChannelBase(ChannelListenerBase factory, EndpointAddress localAddress)
+ : base(factory)
+ {
+ _localAddress = localAddress;
+ _messageQueue = new InputQueue<T>();
+ }
+
+ #endregion
+
+ #region Public properties
+
+ /// <summary>
+ /// Gets the local address.
+ /// </summary>
+ /// <value>The local address.</value>
+ public EndpointAddress LocalAddress
+ {
+ get { return _localAddress; }
+ }
+
+ #endregion
+
+ #region Messaging
+
+ /// <summary>
+ /// Gets the pending message count.
+ /// </summary>
+ /// <value>The pending message count.</value>
+ public int PendingMessageCount
+ {
+ get
+ {
+ return _messageQueue.PendingCount;
+ }
+ }
+
+ /// <summary>
+ /// Dispatches the specified request.
+ /// </summary>
+ /// <param name="request">The request.</param>
+ public void Dispatch(T request)
+ {
+ ThrowIfDisposedOrNotOpen();
+ _messageQueue.EnqueueAndDispatch(request);
+ }
+
+ /// <summary>
+ /// Begins the dequeue operation.
+ /// </summary>
+ /// <param name="timeout">The timeout.</param>
+ /// <param name="callback">The callback.</param>
+ /// <param name="state">The state.</param>
+ public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ return (State == CommunicationState.Opened)
+ ? _messageQueue.BeginDequeue(timeout, callback, state)
+ : new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Ends the dequeue operation.
+ /// </summary>
+ /// <param name="result">The result.</param>
+ /// <returns></returns>
+ public T EndDequeue(IAsyncResult result)
+ {
+ ThrowIfDisposedOrNotOpen();
+ return _messageQueue.EndDequeue(result);
+ }
+
+ /// <summary>
+ /// Dequeues the next message.
+ /// </summary>
+ /// <param name="timeout">The timeout.</param>
+ public T Dequeue(TimeSpan timeout)
+ {
+ ThrowIfDisposedOrNotOpen();
+ return _messageQueue.Dequeue(timeout);
+ }
+
+ /// <summary>
+ /// Tries to dequeue the next message.
+ /// </summary>
+ /// <param name="result">The result.</param>
+ /// <param name="message">The message.</param>
+ /// <returns></returns>
+ public bool TryDequeue(IAsyncResult result, out T message)
+ {
+ message = null;
+ TypedAsyncResult<T> completedResult = result as TypedAsyncResult<T>;
+ if(completedResult != null)
+ {
+ message = TypedAsyncResult<T>.End(result);
+ }
+ else if(result.CompletedSynchronously == false)
+ {
+ InputQueue<T>.AsyncQueueReader completedResult2 = result as InputQueue<T>.AsyncQueueReader;
+ InputQueue<T>.AsyncQueueReader.End(result, out message);
+ }
+ return result.IsCompleted;
+ }
+
+ #endregion
+
+ #region Abort
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
+ /// </summary>
+ protected override void OnAbort()
+ {
+ _messageQueue.Close();
+ }
+
+ #endregion
+
+ #region Open
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation.
+ /// </returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnOpen(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ _messageQueue.Open();
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the open of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region Close
+
+ /// <summary>
+ /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation.
+ /// </returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnClose(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override void OnClose(TimeSpan timeout)
+ {
+ _messageQueue.Close();
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the close of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnEndClose(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region GetProperty
+
+ /// <summary>
+ /// Gets the property.
+ /// </summary>
+ /// <typeparam name="P"></typeparam>
+ public override P GetProperty<P>()
+ {
+ if(typeof(P) == typeof(FaultConverter))
+ {
+ return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as P;
+ }
+ return base.GetProperty<P>();
+ }
+
+ #endregion
+
+ #region Private members
+
+ private EndpointAddress _localAddress;
+ private InputQueue<T> _messageQueue;
+
+ #endregion
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Server-side implementation of the sessioned one-way channel.
+ /// </summary>
+ public class NmsInputSessionChannel : NmsInputChannel, IInputSessionChannel
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsInputSessionChannel"/> class.
+ /// </summary>
+ /// <param name="factory">The factory that was used to create the channel.</param>
+ /// <param name="localAddress">The local address of the channel.</param>
+ internal NmsInputSessionChannel(ChannelListenerBase factory, EndpointAddress localAddress)
+ : base(factory, localAddress)
+ {
+ }
+
+ #endregion
+
+ #region ISessionChannel<IInputSession> Members
+
+ /// <summary>
+ /// Gets the type of session associated with this channel.
+ /// </summary>
+ /// <value></value>
+ /// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns>
+ public IInputSession Session
+ {
+ get { return _session; }
+ }
+
+ /// <summary>
+ /// Internal implementation of a session, with tracking ID.
+ /// </summary>
+ private class InputSession : IInputSession, System.ServiceModel.Channels.ISession
+ {
+ private string _sessionId = NmsChannelHelper.CreateUniqueSessionId();
+
+ /// <summary>
+ /// Gets the ID that uniquely identifies the session.
+ /// </summary>
+ /// <value></value>
+ /// <returns>The ID that uniquely identifies the session. </returns>
+ public string Id
+ {
+ get { return _sessionId; }
+ }
+ }
+
+ #endregion
+
+ #region Private members
+
+ private IInputSession _session = new InputSession();
+
+ #endregion
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Server-side listener for sessioned input channels.
+ /// </summary>
+ public class NmsInputSessionChannelListener : ChannelListenerBase<IInputSessionChannel>, IChannel
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsInputSessionChannelListener"/> class.
+ /// </summary>
+ /// <param name="transportElement">The binding element.</param>
+ /// <param name="context">The context.</param>
+ internal NmsInputSessionChannelListener(NmsTransportBindingElement transportElement, BindingContext context)
+ : base(context.Binding)
+ {
+ _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize);
+
+ MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
+ _messageEncoderFactory = (messageEncoderBindingElement != null)
+ ? messageEncoderBindingElement.CreateMessageEncoderFactory()
+ : NmsConstants.DefaultMessageEncoderFactory;
+
+ _channelQueue = new InputQueue<IInputSessionChannel>();
+ _currentChannelLock = new object();
+ _destinationName = transportElement.Destination;
+ _destinationType = transportElement.DestinationType;
+ _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
+ Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
+ }
+
+ #endregion
+
+ #region Public properties
+
+ /// <summary>
+ /// Gets the message encoder factory.
+ /// </summary>
+ /// <value>The message encoder factory.</value>
+ public MessageEncoderFactory MessageEncoderFactory
+ {
+ get { return _messageEncoderFactory; }
+ }
+
+ /// <summary>
+ /// Gets or sets the destination.
+ /// </summary>
+ /// <value>The destination.</value>
+ public string Destination
+ {
+ get { return _destinationName; }
+ set { _destinationName = value; }
+ }
+
+ /// <summary>
+ /// Gets or sets the type of the destination.
+ /// </summary>
+ /// <value>The type of the destination.</value>
+ public DestinationType DestinationType
+ {
+ get { return _destinationType; }
+ set { _destinationType = value; }
+ }
+
+ #endregion
+
+ #region Implementation of CommunicationObject
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state
+ /// due to the invocation of a synchronous abort operation.
+ /// </summary>
+ /// <remarks>
+ /// Abort can be called at any time, so we can't assume that we've been Opened successfully
+ /// (and thus may not have any listen sockets).
+ /// </remarks>
+ protected override void OnAbort()
+ {
+ OnClose(TimeSpan.Zero);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override void OnClose(TimeSpan timeout)
+ {
+ lock(ThisLock)
+ {
+ if(_consumer != null)
+ {
+ Tracer.Debug("Listener is terminating consumer...");
+ _consumer.Close();
+ _consumer.Dispose();
+ Tracer.Debug("Listener has terminated consumer");
+ }
+
+ if(_session != null)
+ {
+ Tracer.Debug("Listener is terminating session...");
+ _session.Close();
+ Tracer.Debug("Listener has terminated session");
+ }
+
+ if(_connection != null)
+ {
+ Tracer.Debug("Listener is terminating connection...");
+ _connection.Stop();
+ _connection.Close();
+ _connection.Dispose();
+ Tracer.Debug("Listener has terminated connection");
+ }
+
+ _channelQueue.Close();
+ }
+ }
+
+ /// <summary>
+ /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnClose(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the close of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
+ protected override void OnEndClose(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ if(Uri == null)
+ {
+ throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
+ }
+ NmsChannelHelper.ValidateTimeout(timeout);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ OnOpen(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the open of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region Implementation of ChannelListenerBase
+
+ /// <summary>
+ /// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels.
+ /// </returns>
+ public override Uri Uri
+ {
+ get { return _uri; }
+ }
+
+ /// <summary>
+ /// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param>
+ protected override bool OnWaitForChannel(TimeSpan timeout)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ return _channelQueue.WaitForItem(timeout);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param>
+ protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ return _channelQueue.BeginWaitForItem(timeout, callback, state);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
+ /// </summary>
+ /// <returns>
+ /// true if the method completed before the timeout expired; otherwise false.
+ /// </returns>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+ protected override bool OnEndWaitForChannel(IAsyncResult result)
+ {
+ return _channelQueue.EndWaitForItem(result);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an extensibility point when accepting a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+ protected override IInputSessionChannel OnAcceptChannel(TimeSpan timeout)
+ {
+ Tracer.Debug("Accepting channel");
+ NmsChannelHelper.ValidateTimeout(timeout);
+ if(!IsDisposed)
+ {
+ EnsureChannelAvailable();
+ }
+
+ IInputSessionChannel channel;
+ if(_channelQueue.Dequeue(timeout, out channel))
+ {
+ return channel;
+ }
+ throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation.
+ /// </returns>
+ /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param>
+ protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ NmsChannelHelper.ValidateTimeout(timeout);
+ if(!IsDisposed)
+ {
+ EnsureChannelAvailable();
+ }
+ return _channelQueue.BeginDequeue(timeout, callback, state);
+ }
+
+ /// <summary>
+ /// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
+ /// </summary>
+ /// <returns>
+ /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener.
+ /// </returns>
+ /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+ protected override IInputSessionChannel OnEndAcceptChannel(IAsyncResult result)
+ {
+ IInputSessionChannel channel;
+ if(_channelQueue.EndDequeue(result, out channel))
+ {
+ return channel;
+ }
+ throw new TimeoutException();
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Dispatches the callback.
+ /// </summary>
+ /// <param name="state">The state.</param>
+ internal void DispatchCallback(object state)
+ {
+ Dispatch((Message) state);
+ }
+
+ /// <summary>
+ /// Matches an incoming message to its waiting listener,
+ /// using the FilterTable to dispatch the message to the correct
+ /// listener. If no listener is waiting for the message, it is silently
+ /// discarded.
+ /// </summary>
+ internal void Dispatch(Message message)
+ {
+ if(message == null)
+ {
+ return;
+ }
+
+ try
+ {
+ NmsInputSessionChannel newChannel;
+ bool channelCreated = CreateOrRetrieveChannel(out newChannel);
+
+ Tracer.Debug("Dispatching incoming message");
+ newChannel.Dispatch(message);
+
+ if(channelCreated)
+ {
+ //Hand the channel off to whomever is waiting for AcceptChannel() to complete
+ Tracer.Debug("Handing off channel");
+ _channelQueue.EnqueueAndDispatch(newChannel);
+ }
+ }
+ catch(Exception e)
+ {
+ Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString());
+ }
+ }
+
+ /// <summary>
+ /// Creates or retrieves the channel.
+ /// </summary>
+ /// <param name="newChannel">The channel.</param>
+ private bool CreateOrRetrieveChannel(out NmsInputSessionChannel newChannel)
+ {
+ bool channelCreated = false;
+
+ if((newChannel = _currentChannel) == null)
+ {
+ lock(_currentChannelLock)
+ {
+ if((newChannel = _currentChannel) == null)
+ {
+ newChannel = CreateNmsChannel(Uri);
+ newChannel.Closed += OnChannelClosed;
+ _currentChannel = newChannel;
+ channelCreated = true;
+ }
+ }
+ }
+
+ return channelCreated;
+ }
+
+ /// <summary>
+ /// Called when the channel is closed.
+ /// </summary>
+ /// <param name="sender">The sender.</param>
+ /// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param>
+ private void OnChannelClosed(object sender, EventArgs args)
+ {
+ NmsInputSessionChannel channel = (NmsInputSessionChannel) sender;
+
+ lock(_currentChannelLock)
+ {
+ if(channel == _currentChannel)
+ {
+ _currentChannel = null;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages.
+ /// </summary>
+ /// <param name="uri">The URI for the message queue.</param>
+ private NmsInputSessionChannel CreateNmsChannel(Uri uri)
+ {
+ _connection = OpenConnection(uri);
+ _session = OpenSession(_connection);
+ _destination = SessionUtil.GetDestination(_session, Destination, DestinationType);
+ _consumer = CreateConsumer(_session, _destination);
+
+ EndpointAddress address = new EndpointAddress(uri);
+ return new NmsInputSessionChannel(this, address);
+ }
+
+ /// <summary>
+ /// Opens the connection to the message broker.
+ /// </summary>
+ /// <param name="uri">The URI.</param>
+ /// <returns>An active connection to the ActiveMQ message broker specified by the URI;
+ /// exceptions will be caught by the attached ExceptionListener.</returns>
+ private IConnection OpenConnection(Uri uri)
+ {
+ IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+ connection.ExceptionListener += OnExceptionThrown;
+ connection.Start();
+ Tracer.Debug("Connection open");
+ return connection;
+ }
+
+ /// <summary>
+ /// Opens a session to communicate with a message queue.
+ /// </summary>
+ /// <param name="connection">The connection to the ActiveMQ message broker.</param>
+ /// <returns>A session.</returns>
+ /// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet
+ /// been started.</exception>
+ private ISession OpenSession(IConnection connection)
+ {
+ if(!connection.IsStarted)
+ {
+ throw new InvalidOperationException("The connection has not yet been opened");
+ }
+
+ Tracer.Debug("Opening session...");
+ ISession session = connection.CreateSession();
+ Tracer.Debug("Session open");
+ return session;
+ }
+
+ /// <summary>
+ /// Creates the consumer of messages received on the <paramref name="session"/>.
+ /// </summary>
+ /// <param name="session">The session.</param>
+ /// <param name="destination">The destination.</param>
+ /// <returns>A consumer for any messages received during the session;
+ /// messages will be consumed by the attached Listener.</returns>
+ private IMessageConsumer CreateConsumer(ISession session, IDestination destination)
+ {
+ Tracer.Debug("Creating message listener...");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ consumer.Listener += OnReceiveMessage;
+ Tracer.Debug("Created message listener");
+ return consumer;
+ }
+
+ /// <summary>
+ /// Event handler that processes a received message.
+ /// </summary>
+ /// <param name="message">The message.</param>
+ private void OnReceiveMessage(IMessage message)
+ {
+ Tracer.Debug("Decoding message");
+ string soapMsg = ((ITextMessage) message).Text;
+ byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+ int dataLength = buffer.Length;
+ byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+ Array.Copy(buffer, data1, dataLength);
+
+ ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength);
+
+ byte[] msgContents = new byte[data.Count];
+ Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
+ Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+ Tracer.Debug(msg);
+ Dispatch(msg);
+ }
+
+ /// <summary>
+ /// Called when an exception is thrown by the ActiveMQ listener.
+ /// </summary>
+ /// <remarks>
+ /// <see cref="NMSException" />s will be caught and logged; all other exceptions will
+ /// be thrown back up to the WCF service.
+ /// </remarks>
+ /// <param name="exception">The exception that was thrown.</param>
+ private void OnExceptionThrown(Exception exception)
+ {
+ if(exception is NMSException)
+ {
+ Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
+ exception.GetType().Name,
+ exception.Message,
+ exception.StackTrace);
+ return;
+ }
+
+ // TODO: can we recover from the exception? Do we convert to WCF exceptions?
+ throw exception;
+ }
+
+ /// <summary>
+ /// Guarantees that a channel is attached to this listener.
+ /// </summary>
+ private void EnsureChannelAvailable()
+ {
+ NmsInputSessionChannel newChannel;
+ if(CreateOrRetrieveChannel(out newChannel))
+ {
+ _channelQueue.EnqueueAndDispatch(newChannel);
+ }
+ }
+
+ #region Private members
+
+ private readonly Uri _uri;
+ private IConnection _connection;
+ private ISession _session;
+ private IDestination _destination;
+ private IMessageConsumer _consumer;
+ private readonly InputQueue<IInputSessionChannel> _channelQueue;
+ private NmsInputSessionChannel _currentChannel;
+ private readonly object _currentChannelLock;
+ private readonly MessageEncoderFactory _messageEncoderFactory;
+ private readonly BufferManager _bufferManager;
+ private string _destinationName;
+ private DestinationType _destinationType;
+
+ #endregion
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs Fri Dec 12 10:25:52 2008
@@ -20,28 +20,30 @@
using System.ServiceModel.Channels;
using System.Text;
using System.Xml;
+using Apache.NMS.Util;
namespace Apache.NMS.WCF
{
/// <summary>
- /// Channel for sending messages.
+ /// Client-side implementation of the sessionless one-way channel.
/// </summary>
- public class NmsOutputChannel : NmsChannelBase, IOutputChannel
+ public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel
{
#region Constructors
/// <summary>
/// Initializes a new instance of the <see cref="NmsOutputChannel"/> class.
/// </summary>
+ /// <param name="factory">The factory that created the channel.</param>
+ /// <param name="remoteAddress">The remote address of the channel.</param>
+ /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
/// <param name="bufferManager">The buffer manager.</param>
/// <param name="encoderFactory">The encoder factory.</param>
- /// <param name="address">The address.</param>
- /// <param name="parent">The parent.</param>
- /// <param name="via">The via.</param>
- public NmsOutputChannel(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, NmsChannelFactory parent, Uri via)
- : base(bufferManager, encoderFactory, address, parent, parent.Destination, parent.DestinationType)
+ /// <param name="destination">The name of the ActiveMQ destination.</param>
+ /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+ public NmsOutputChannel(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+ : base(factory, remoteAddress, via, bufferManager, encoderFactory, destination, destinationType)
{
- _via = via;
_connection = ConnectionFactoryManager.GetInstance().CreateConnection(via);
_connection.Start();
}
@@ -67,16 +69,14 @@
public void Send(Message message, TimeSpan timeout)
{
ThrowIfDisposedOrNotOpen();
+ RemoteAddress.ApplyTo(message);
using(NMS.ISession session = _connection.CreateSession())
{
- IDestination destination = NmsChannelHelper.GetDestination(session, Destination, DestinationType);
+ IDestination destination = SessionUtil.GetDestination(session, Destination, DestinationType);
using(IMessageProducer producer = session.CreateProducer(destination))
{
producer.Persistent = true;
- message.Headers.To = RemoteAddress.Uri;
- //TODO: check if this is synonymous with the above operation
- //RemoteAddress.ApplyTo(message);
ITextMessage request = session.CreateTextMessage(TranslateMessage(message));
producer.Send(request);
@@ -94,7 +94,7 @@
/// <param name="message">The message to be translated.</param>
private string TranslateMessage(Message message)
{
- return (Encoder.MessageVersion == MessageVersion.Soap11)
+ return (this.Encoder.MessageVersion == MessageVersion.Soap11)
? TranslateMessageAsSoap11(message)
: TranslateMessageAsSoap12(message);
}
@@ -169,17 +169,6 @@
NmsAsyncResult.End(result);
}
- /// <summary>
- /// Gets the URI that contains the transport address to which messages are sent on the output channel.
- /// </summary>
- /// <returns>
- /// The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel.
- /// </returns>
- public Uri Via
- {
- get { return _via; }
- }
-
#endregion
#region Implementation of CommunicationObject
@@ -307,9 +296,8 @@
#region Private members
- private readonly Uri _via;
private readonly IConnection _connection;
#endregion
}
-}
\ No newline at end of file
+}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Base class for NMS output channels.
+ /// </summary>
+ public abstract class NmsOutputChannelBase : ChannelBase
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsOutputChannelBase"/> class.
+ /// </summary>
+ /// <param name="factory">The factory that created the channel.</param>
+ /// <param name="remoteAddress">The remote address for the channel.</param>
+ /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
+ /// <param name="bufferManager">The buffer manager.</param>
+ /// <param name="encoderFactory">The encoder factory.</param>
+ /// <param name="destination">The name of the ActiveMQ destination.</param>
+ /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+ internal NmsOutputChannelBase(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+ : base(factory)
+ {
+ _remoteAddress = remoteAddress;
+ _via = via;
+ _bufferManager = bufferManager;
+ _encoder = encoderFactory;
+ _destination = destination;
+ _destinationType = destinationType;
+ }
+
+ #endregion
+
+ #region NullRequestContextCollection
+
+ //public NmsAsyncRequestContextCollection PendingRequests
+ //{
+ // get { return _pendingRequests; }
+ //}
+
+ #endregion
+
+ #region Public properties
+
+ /// <summary>
+ /// Gets the remote address.
+ /// </summary>
+ /// <value>The remote address.</value>
+ public EndpointAddress RemoteAddress
+ {
+ get { return _remoteAddress; }
+ }
+
+ /// <summary>
+ /// Gets the routing address.
+ /// </summary>
+ /// <value>The routing address.</value>
+ public Uri Via
+ {
+ get { return _via; }
+ }
+
+ /// <summary>
+ /// Gets the buffer manager.
+ /// </summary>
+ /// <value>The buffer manager.</value>
+ public BufferManager BufferManager
+ {
+ get { return _bufferManager; }
+ }
+
+ /// <summary>
+ /// Gets the encoder.
+ /// </summary>
+ /// <value>The encoder.</value>
+ public MessageEncoder Encoder
+ {
+ get { return _encoder.Encoder; }
+ }
+
+ /// <summary>
+ /// Gets the name of the destination (either a queue or a topic).
+ /// </summary>
+ /// <value>The name of the destination.</value>
+ public string Destination
+ {
+ get { return _destination; }
+ }
+
+ /// <summary>
+ /// Gets the type of the destination.
+ /// </summary>
+ /// <value>The type of the destination.</value>
+ public DestinationType DestinationType
+ {
+ get { return _destinationType; }
+ }
+
+ #endregion
+
+ #region Abort
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
+ /// </summary>
+ protected override void OnAbort()
+ {
+ //_pendingRequests.AbortAll();
+ }
+
+ #endregion
+
+ #region Close
+
+ /// <summary>
+ /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation.
+ /// </returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnClose(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override void OnClose(TimeSpan timeout)
+ {
+ //_pendingRequests.AbortAll();
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the close of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnEndClose(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region Open
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+ /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+ /// <returns>
+ /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation.
+ /// </returns>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+ {
+ OnOpen(timeout);
+ return new CompletedAsyncResult(callback, state);
+ }
+
+ /// <summary>
+ /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+ /// </summary>
+ /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param>
+ /// <exception cref="T:System.ArgumentOutOfRangeException">
+ /// <paramref name="timeout"/> is less than zero.</exception>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnOpen(TimeSpan timeout)
+ {
+ }
+
+ /// <summary>
+ /// Completes an asynchronous operation on the open of a communication object.
+ /// </summary>
+ /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param>
+ /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception>
+ protected override void OnEndOpen(IAsyncResult result)
+ {
+ CompletedAsyncResult.End(result);
+ }
+
+ #endregion
+
+ #region GetProperty
+
+ /// <summary>
+ /// Gets the property.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <returns></returns>
+ public override T GetProperty<T>()
+ {
+ if(typeof(T) == typeof(FaultConverter))
+ {
+ return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as T;
+ }
+ return base.GetProperty<T>();
+ }
+
+ #endregion
+
+ #region Private members
+
+ private EndpointAddress _remoteAddress;
+ private Uri _via;
+ private BufferManager _bufferManager;
+ private MessageEncoderFactory _encoder;
+ private string _destination;
+ private DestinationType _destinationType;
+
+ // for request/reply pattern
+ //NullAsyncRequestContextCollection _pendingRequests = new NullAsyncRequestContextCollection();
+
+ #endregion
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+ /// <summary>
+ /// Client-side implementation of the sessioned one-way channel.
+ /// </summary>
+ internal class NmsOutputSessionChannel : NmsOutputChannel, IOutputSessionChannel
+ {
+ #region Constructors
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="NmsOutputSessionChannel"/> class.
+ /// </summary>
+ /// <param name="factory">The factory that created this channel.</param>
+ /// <param name="address">The address of this channel.</param>
+ /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param>
+ /// <param name="bufferManager">The buffer manager.</param>
+ /// <param name="encoderFactory">The encoder factory.</param>
+ /// <param name="destination">The name of the ActiveMQ destination.</param>
+ /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param>
+ public NmsOutputSessionChannel(ChannelManagerBase factory, Uri via, EndpointAddress address, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType)
+ : base(factory, address, via, bufferManager, encoderFactory, destination, destinationType)
+ {
+ }
+
+ #endregion
+
+ #region ISessionChannel<IOutputSession> Members
+
+ /// <summary>
+ /// Gets the type of session associated with this channel.
+ /// </summary>
+ /// <value></value>
+ /// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns>
+ public IOutputSession Session
+ {
+ get { return _session; }
+ }
+
+ /// <summary>
+ /// Internal implementation of a session, with tracking ID.
+ /// </summary>
+ private class OutputSession : IOutputSession, System.ServiceModel.Channels.ISession
+ {
+ private string _sessionId = NmsChannelHelper.CreateUniqueSessionId();
+
+ /// <summary>
+ /// Gets the ID that uniquely identifies the session.
+ /// </summary>
+ /// <value></value>
+ /// <returns>The ID that uniquely identifies the session. </returns>
+ public string Id
+ {
+ get { return _sessionId; }
+ }
+ }
+
+ #endregion
+
+ #region Private members
+
+ private IOutputSession _session = new OutputSession();
+
+ #endregion
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj Fri Dec 12 10:25:52 2008
@@ -66,12 +66,13 @@
</NoWarn>
</PropertyGroup>
<ItemGroup>
- <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, PublicKeyToken=2a329723af30bc8d, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>lib\Apache.NMS\net-3.5\Apache.NMS.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.configuration" />
+ <Reference Include="System.Data" />
<Reference Include="System.Runtime.Serialization">
<RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
@@ -121,12 +122,16 @@
<Compile Include="src\main\csharp\ConnectionFactoryManager.cs" />
<Compile Include="src\main\csharp\InputQueue.cs" />
<Compile Include="src\main\csharp\NmsAsyncResult.cs" />
- <Compile Include="src\main\csharp\NmsChannelBase.cs" />
+ <Compile Include="src\main\csharp\NmsInputQueueChannelBase.cs" />
<Compile Include="src\main\csharp\NmsChannelFactory.cs" />
<Compile Include="src\main\csharp\NmsChannelHelper.cs" />
- <Compile Include="src\main\csharp\NmsChannelListener.cs" />
+ <Compile Include="src\main\csharp\NmsInputChannelListener.cs" />
<Compile Include="src\main\csharp\NmsInputChannel.cs" />
+ <Compile Include="src\main\csharp\NmsInputSessionChannel.cs" />
+ <Compile Include="src\main\csharp\NmsInputSessionChannelListener.cs" />
<Compile Include="src\main\csharp\NmsOutputChannel.cs" />
+ <Compile Include="src\main\csharp\NmsOutputChannelBase.cs" />
+ <Compile Include="src\main\csharp\NmsOutputSessionChannel.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets" />
<PropertyGroup>
@@ -135,4 +140,4 @@
<PostBuildEvent>cd $(ProjectDir)
nant -nologo -q install-all -D:compile.skip=true</PostBuildEvent>
</PropertyGroup>
-</Project>
\ No newline at end of file
+</Project>