You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [2/19] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/
dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/
dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...
Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/LICENSE
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/LICENSE?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/LICENSE (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/LICENSE Fri Feb 8 02:09:37 2008
@@ -201,3 +201,150 @@
See the License for the specific language governing permissions and
limitations under the License.
+
+=========================================================================
+
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
+
+==========================================================================
+ AMQP License
+=========================================================================
+ Copyright Notice
+ ================
+ (c) Copyright JPMorgan Chase Bank & Co., Cisco Systems, Inc., Envoy Technologies Inc.,
+ iMatix Corporation, IONA\ufffd Technologies, Red Hat, Inc.,
+ TWIST Process Innovations, and 29West Inc. 2006. All rights reserved.
+
+ License
+ =======
+ JPMorgan Chase Bank & Co., Cisco Systems, Inc., Envoy Technologies Inc., iMatix
+ Corporation, IONA Technologies, Red Hat, Inc., TWIST Process Innovations, and
+ 29West Inc. (collectively, the "Authors") each hereby grants to you a worldwide,
+ perpetual, royalty-free, nontransferable, nonexclusive license to
+ (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol
+ ("AMQP") Specification and (ii) the Licensed Claims that are held by
+ the Authors, all for the purpose of implementing the Advanced Messaging
+ Queue Protocol Specification. Your license and any rights under this
+ Agreement will terminate immediately without notice from
+ any Author if you bring any claim, suit, demand, or action related to
+ the Advanced Messaging Queue Protocol Specification against any Author.
+ Upon termination, you shall destroy all copies of the Advanced Messaging
+ Queue Protocol Specification in your possession or control.
+
+ As used hereunder, "Licensed Claims" means those claims of a patent or
+ patent application, throughout the world, excluding design patents and
+ design registrations, owned or controlled, or that can be sublicensed
+ without fee and in compliance with the requirements of this
+ Agreement, by an Author or its affiliates now or at any
+ future time and which would necessarily be infringed by implementation
+ of the Advanced Messaging Queue Protocol Specification. A claim is
+ necessarily infringed hereunder only when it is not possible to avoid
+ infringing it because there is no plausible non-infringing alternative
+ for implementing the required portions of the Advanced Messaging Queue
+ Protocol Specification. Notwithstanding the foregoing, Licensed Claims
+ shall not include any claims other than as set forth above even if
+ contained in the same patent as Licensed Claims; or that read solely
+ on any implementations of any portion of the Advanced Messaging Queue
+ Protocol Specification that are not required by the Advanced Messaging
+ Queue Protocol Specification, or that, if licensed, would require a
+ payment of royalties by the licensor to unaffiliated third parties.
+ Moreover, Licensed Claims shall not include (i) any enabling technologies
+ that may be necessary to make or use any Licensed Product but are not
+ themselves expressly set forth in the Advanced Messaging Queue Protocol
+ Specification (e.g., semiconductor manufacturing technology, compiler
+ technology, object oriented technology, networking technology, operating
+ system technology, and the like); or (ii) the implementation of other
+ published standards developed elsewhere and merely referred to in the
+ body of the Advanced Messaging Queue Protocol Specification, or
+ (iii) any Licensed Product and any combinations thereof the purpose or
+ function of which is not required for compliance with the Advanced
+ Messaging Queue Protocol Specification. For purposes of this definition,
+ the Advanced Messaging Queue Protocol Specification shall be deemed to
+ include both architectural and interconnection requirements essential
+ for interoperability and may also include supporting source code artifacts
+ where such architectural, interconnection requirements and source code
+ artifacts are expressly identified as being required or documentation to
+ achieve compliance with the Advanced Messaging Queue Protocol Specification.
+
+ As used hereunder, "Licensed Products" means only those specific portions
+ of products (hardware, software or combinations thereof) that implement
+ and are compliant with all relevant portions of the Advanced Messaging
+ Queue Protocol Specification.
+
+ The following disclaimers, which you hereby also acknowledge as to any
+ use you may make of the Advanced Messaging Queue Protocol Specification:
+
+ THE ADVANCED MESSAGING QUEUE PROTOCOL SPECIFICATION IS PROVIDED "AS IS,"
+ AND THE AUTHORS MAKE NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+ IMPLIED, INCLUDING, BUT NOT LIMITED TO, WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, OR TITLE; THAT THE
+ CONTENTS OF THE ADVANCED MESSAGING QUEUE PROTOCOL SPECIFICATION ARE
+ SUITABLE FOR ANY PURPOSE; NOR THAT THE IMPLEMENTATION OF THE ADVANCED
+ MESSAGING QUEUE PROTOCOL SPECIFICATION WILL NOT INFRINGE ANY THIRD PARTY
+ PATENTS, COPYRIGHTS, TRADEMARKS OR OTHER RIGHTS.
+
+ THE AUTHORS WILL NOT BE LIABLE FOR ANY DIRECT, INDIRECT, SPECIAL,
+ INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF OR RELATING TO ANY
+ USE, IMPLEMENTATION OR DISTRIBUTION OF THE ADVANCED MESSAGING QUEUE
+ PROTOCOL SPECIFICATION.
+
+ The name and trademarks of the Authors may NOT be used in any manner,
+ including advertising or publicity pertaining to the Advanced Messaging
+ Queue Protocol Specification or its contents without specific, written
+ prior permission. Title to copyright in the Advanced Messaging Queue
+ Protocol Specification will at all times remain with the Authors.
+
+ No other rights are granted by implication, estoppel or otherwise.
+
+ Upon termination of your license or rights under this Agreement, you
+ shall destroy all copies of the Advanced Messaging Queue Protocol
+ Specification in your possession or control.
+
+ Trademarks
+ ==========
+ "JPMorgan", "JPMorgan Chase", "Chase", the JPMorgan Chase logo and the
+ Octagon Symbol are trademarks of JPMorgan Chase & Co.
+
+ IMATIX and the iMatix logo are trademarks of iMatix Corporation sprl.
+
+ IONA, IONA Technologies, and the IONA logos are trademarks of IONA
+ Technologies PLC and/or its subsidiaries.
+
+ LINUX is a trademark of Linus Torvalds. RED HAT and JBOSS are registered
+ trademarks of Red Hat, Inc. in the US and other countries.
+
+ Java, all Java-based trademarks and OpenOffice.org are trademarks of
+ Sun Microsystems, Inc. in the United States, other countries, or both.
+
+ Other company, product, or service names may be trademarks or service
+ marks of others.
+
+ Links to full AMQP specification:
+ =================================
+ http://www.envoytech.org/spec/amq/
+ http://www.iona.com/opensource/amqp/
+ http://www.redhat.com/solutions/specifications/amqp/
+ http://www.twiststandards.org/tiki-index.php?page=AMQ
+ http://www.imatix.com/amqp
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer.Tests/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer.Tests/Properties/AssemblyInfo.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer.Tests/Properties/AssemblyInfo.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer.Tests/Properties/AssemblyInfo.cs Fri Feb 8 02:09:37 2008
@@ -1,4 +1,4 @@
-using System.Reflection;
+using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
@@ -6,11 +6,11 @@
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Apache.Qpid.Buffer.Tests")]
-[assembly: AssemblyDescription("")]
+[assembly: AssemblyDescription("Built from svn revision number: ")]
[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("")]
+[assembly: AssemblyCompany("Apache Software Foundation")]
[assembly: AssemblyProduct("Apache.Qpid.Buffer.Tests")]
-[assembly: AssemblyCopyright("Copyright © 2007")]
+[assembly: AssemblyCopyright("Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -31,5 +31,5 @@
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
-[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyVersion("2.1.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer/Properties/AssemblyInfo.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer/Properties/AssemblyInfo.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Buffer/Properties/AssemblyInfo.cs Fri Feb 8 02:09:37 2008
@@ -1,4 +1,4 @@
-/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,11 +24,11 @@
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Apache.Qpid.ByteBuffer")]
-[assembly: AssemblyDescription("")]
+[assembly: AssemblyDescription("Built from svn revision number: ")]
[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Apache Qpid")]
+[assembly: AssemblyCompany("Apache Software Foundation")]
[assembly: AssemblyProduct("Apache.Qpid.ByteBuffer")]
-[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")]
+[assembly: AssemblyCopyright("Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -49,5 +49,5 @@
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
-[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyVersion("2.1.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/Properties/AssemblyInfo.cs Fri Feb 8 02:09:37 2008
@@ -1,4 +1,4 @@
-/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -27,11 +27,11 @@
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Apache.Qpid.Client.Tests")]
-[assembly: AssemblyDescription("Test Suite for Qpid Clients")]
+[assembly: AssemblyDescription("Built from svn revision number: ")]
[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Apache Qpid")]
+[assembly: AssemblyCompany("Apache Software Foundation")]
[assembly: AssemblyProduct("Apache.Qpid.Client.Tests")]
-[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")]
+[assembly: AssemblyCopyright("Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -50,4 +50,4 @@
// Build Number
// Revision
//
-[assembly: AssemblyVersion("0.5.*")]
+[assembly: AssemblyVersion("2.1.0.0")]
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/default.build
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/default.build?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/default.build (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Tests/default.build Fri Feb 8 02:09:37 2008
@@ -1,44 +1,39 @@
<?xml version="1.0"?>
<project name="Apache.Qpid.Client" default="test">
- <target name="build">
+ <target name="build">
<csc target="library"
- define="${build.defines}"
- warnaserror="false" debug="${build.debug}"
- output="${build.dir}/${project::get-name()}.Tests.dll">
-
+ define="${build.defines}"
+ warnaserror="false" debug="${build.debug}"
+ output="${build.dir}/${project::get-name()}.Tests.dll">
+
<sources>
<include name="**/*.cs" />
</sources>
<references>
- <include name="${build.dir}/log4net.dll" />
- <include name="${build.dir}/nunit.framework.dll" />
- <include name="${build.dir}/${project::get-name()}.dll" />
- <include name="${build.dir}/Apache.Qpid.Common.dll" />
- <include name="${build.dir}/Apache.Qpid.Messaging.dll" />
- <include name="${build.dir}/Apache.Qpid.Sasl.dll" />
+ <include name="${build.dir}/log4net.dll" />
+ <include name="${build.dir}/nunit.framework.dll" />
+ <include name="${build.dir}/${project::get-name()}.dll" />
+ <include name="${build.dir}/Apache.Qpid.Common.dll" />
+ <include name="${build.dir}/Apache.Qpid.Messaging.dll" />
+ <include name="${build.dir}/Apache.Qpid.Sasl.dll" />
</references>
</csc>
- <copy
- tofile="${build.dir}/${project::get-name()}.Tests.dll.config"
- file="App.config"
- />
- <copy
- todir="${build.dir}"
- file="log4net.config"
- />
+ <copy tofile="${build.dir}/${project::get-name()}.Tests.dll.config" file="App.config" />
+ <copy todir="${build.dir}" file="log4net.config"/>
</target>
<target name="test" depends="build">
- <nunit2>
+ <nunit2 verbose="true">
<formatter type="${nant.formatter}" usefile="false" />
<test>
- <assemblies>
- <include name="${build.dir}/${project::get-name()}.tests.dll"/>
- </assemblies>
- <categories>
- <exclude name="Failover"/>
- <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/>
- </categories>
+ <assemblies>
+ <include name="${build.dir}/${project::get-name()}.tests.dll"/>
+ </assemblies>
+ <categories>
+ <!-- The fail-over tests are interactive so should not be run as part of the build. -->
+ <exclude name="Integration"/>
+ <exclude name="SSL" if="${framework::get-target-framework() == 'mono-2.0'}"/>
+ </categories>
</test>
</nunit2>
</target>
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/ByteChannel.cs Fri Feb 8 02:09:37 2008
@@ -27,7 +27,7 @@
class ByteChannel : IByteChannel
{
// Warning: don't use this log for regular logging.
- private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
+ private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel");
BlockingSocketProcessor processor;
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client.Transport.Socket.Blocking/Properties/AssemblyInfo.cs Fri Feb 8 02:09:37 2008
@@ -1,4 +1,4 @@
-/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,11 +24,11 @@
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Apache.Qpid.Transport.Blocking")]
-[assembly: AssemblyDescription("")]
+[assembly: AssemblyDescription("Built from svn revision number: ")]
[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Apache Qpid")]
+[assembly: AssemblyCompany("Apache Software Foundation")]
[assembly: AssemblyProduct("Apache.Qpid.Transport.Blocking")]
-[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")]
+[assembly: AssemblyCopyright("Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -49,5 +49,5 @@
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
-[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyVersion("2.1.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Fri Feb 8 02:09:37 2008
@@ -816,10 +816,7 @@
if (ProtocolInitiation.CURRENT_PROTOCOL_VERSION_MAJOR != 7)
{
// Basic.Qos frame appears to not be supported by OpenAMQ 1.0d.
- _protocolWriter.SyncWrite(
- BasicQosBody.CreateAMQFrame(
- channelId, (uint)prefetchHigh, 0, false),
- typeof (BasicQosOkBody));
+ _protocolWriter.SyncWrite(BasicQosBody.CreateAMQFrame(channelId, 0, (ushort)prefetchHigh, false), typeof (BasicQosOkBody));
}
if (transacted)
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Fri Feb 8 02:09:37 2008
@@ -33,26 +33,43 @@
namespace Apache.Qpid.Client
{
+ /// <summary>
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Declare queues.
+ /// <tr><td> Declare exchanges.
+ /// <tr><td> Bind queues to exchanges.
+ /// <tr><td> Create messages.
+ /// <tr><td> Set up message consumers on the channel.
+ /// <tr><td> Set up message producers on the channel.
+ /// <tr><td> Commit the current transaction.
+ /// <tr><td> Roll-back the current transaction.
+ /// <tr><td> Close the channel.
+ /// </table>
+ /// </summary>
public class AmqChannel : Closeable, IChannel
{
private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel));
internal const int BASIC_CONTENT_TYPE = 60;
+ public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+
+ public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
+
private static int _nextSessionNumber = 0;
+ private AMQConnection _connection;
+
private int _sessionNumber;
+
private bool _suspended;
+
private object _suspensionLock = new object();
// Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- public const int DEFAULT_PREFETCH_HIGH_MARK = 5000;
- public const int DEFAULT_PREFETCH_LOW_MARK = 2500;
-
- private AMQConnection _connection;
-
private bool _transacted;
private AcknowledgeMode _acknowledgeMode;
@@ -60,6 +77,7 @@
private ushort _channelId;
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
private FlowControlQueue _queue;
@@ -68,14 +86,10 @@
private MessageFactoryRegistry _messageFactoryRegistry;
- /// <summary>
- /// Set of all producers created by this session
- /// </summary>
+ /// <summary> Holds all of the producers created by this channel. </summary>
private Hashtable _producers = Hashtable.Synchronized(new Hashtable());
- /// <summary>
- /// Maps from consumer tag to JMSMessageConsumer instance
- /// </summary>
+ /// <summary> Holds all of the consumers created by this channel. </summary>
private Hashtable _consumers = Hashtable.Synchronized(new Hashtable());
private ArrayList _replayFrames = new ArrayList();
@@ -90,157 +104,253 @@
private long _nextProducerId;
/// <summary>
- /// Responsible for decoding a message fragment and passing it to the appropriate message consumer.
+ /// Initializes a new instance of the <see cref="AmqChannel"/> class.
/// </summary>
- private class Dispatcher
- {
- private int _stopped = 0;
-
- private AmqChannel _containingChannel;
+ /// <param name="con">The connection.</param>
+ /// <param name="channelId">The channel id.</param>
+ /// <param name="transacted">if set to <c>true</c> [transacted].</param>
+ /// <param name="acknowledgeMode">The acknowledge mode.</param>
+ /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
+ /// <param name="defaultPrefetchLow">Default prefetch low value</param>
+ internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode,
+ int defaultPrefetchHigh, int defaultPrefetchLow)
+ : this()
+ {
+ _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
+ _connection = con;
+ _transacted = transacted;
- public Dispatcher(AmqChannel containingChannel)
+ if ( transacted )
{
- _containingChannel = containingChannel;
- }
-
- /// <summary>
- /// Runs the dispatcher. This is intended to be Run in a separate thread.
- /// </summary>
- public void RunDispatcher()
+ _acknowledgeMode = AcknowledgeMode.SessionTransacted;
+ }
+ else
{
- UnprocessedMessage message;
-
- while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
- {
- //_queue.size()
- DispatchMessage(message);
- }
-
- _logger.Debug("Dispatcher thread terminating for channel " + _containingChannel._channelId);
+ _acknowledgeMode = acknowledgeMode;
}
- private void DispatchMessage(UnprocessedMessage message)
+ _channelId = channelId;
+ _defaultPrefetchHighMark = defaultPrefetchHigh;
+ _defaultPrefetchLowMark = defaultPrefetchLow;
+
+ if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
{
- if (message.DeliverBody != null)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
+ _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark,
+ new ThresholdMethod(OnPrefetchLowMark),
+ new ThresholdMethod(OnPrefetchHighMark));
+ }
+ else
+ {
+ // low and upper are the same
+ _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark,
+ null, null);
+ }
+ }
- if (consumer == null)
- {
- _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
- }
- else
- {
- consumer.NotifyMessage(message, _containingChannel.ChannelId);
- }
- }
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
- CreateMessage(0, false, message.ContentHeader, message.Bodies);
-
- int errorCode = message.BounceBody.ReplyCode;
- string reason = message.BounceBody.ReplyText;
- _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ private AmqChannel()
+ {
+ _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+ }
- _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
- catch (Exception e)
- {
- _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
+ /// <summary>
+ /// Acknowledge mode for messages received.
+ /// </summary>
+ public AcknowledgeMode AcknowledgeMode
+ {
+ get
+ {
+ CheckNotClosed();
+ return _acknowledgeMode;
}
+ }
- public void StopDispatcher()
+ /// <summary>
+ /// True if the channel should use transactions.
+ /// </summary>
+ public bool Transacted
+ {
+ get
{
- Interlocked.Exchange(ref _stopped, 1);
+ CheckNotClosed();
+ return _transacted;
}
}
/// <summary>
- /// Initializes a new instance of the <see cref="AmqChannel"/> class.
+ /// Prefetch value to be used as the default for
+ /// consumers created on this channel.
/// </summary>
- /// <param name="con">The connection.</param>
- /// <param name="channelId">The channel id.</param>
- /// <param name="transacted">if set to <c>true</c> [transacted].</param>
- /// <param name="acknowledgeMode">The acknowledge mode.</param>
- /// <param name="defaultPrefetchHigh">Default prefetch high value</param>
- /// <param name="defaultPrefetchLow">Default prefetch low value</param>
- internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
- : this()
+ public int DefaultPrefetch
{
- _sessionNumber = Interlocked.Increment(ref _nextSessionNumber);
- _connection = con;
- _transacted = transacted;
- if ( transacted )
- {
- _acknowledgeMode = AcknowledgeMode.SessionTransacted;
- } else
- {
- _acknowledgeMode = acknowledgeMode;
- }
- _channelId = channelId;
- _defaultPrefetchHighMark = defaultPrefetchHigh;
- _defaultPrefetchLowMark = defaultPrefetchLow;
-
- if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
- {
- _queue = new FlowControlQueue(
- _defaultPrefetchLowMark, _defaultPrefetchHighMark,
- new ThresholdMethod(OnPrefetchLowMark),
- new ThresholdMethod(OnPrefetchHighMark)
- );
- } else
- {
- // low and upper are the same
- _queue = new FlowControlQueue(
- _defaultPrefetchHighMark, _defaultPrefetchHighMark,
- null, null
- );
- }
+ get { return DefaultPrefetchHigh; }
}
- private AmqChannel()
+ /// <summary>
+ /// Prefetch low value to be used as the default for
+ /// consumers created on this channel.
+ /// </summary>
+ public int DefaultPrefetchLow
{
- _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry();
+ get { return _defaultPrefetchLowMark; }
}
/// <summary>
- /// Create a disconnected channel that will fault
- /// for most things, but is useful for testing
+ /// Prefetch high value to be used as the default for
+ /// consumers created on this channel.
/// </summary>
- /// <returns>A new disconnected channel</returns>
- public static IChannel CreateDisconnectedChannel()
+ public int DefaultPrefetchHigh
+ {
+ get { return _defaultPrefetchHighMark; }
+ }
+
+ /// <summary> Indicates whether or not this channel is currently suspended. </summary>
+ public bool IsSuspended
{
- return new AmqChannel();
+ get { return _suspended; }
}
+ /// <summary> Provides the channels number within the the connection. </summary>
+ public ushort ChannelId
+ {
+ get { return _channelId; }
+ }
- public IBytesMessage CreateBytesMessage()
+ /// <summary> Provides the connection that this channel runs over. </summary>
+ public AMQConnection Connection
{
- return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ get { return _connection; }
}
+ /// <summary>
+ /// Declare a new exchange.
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
+ public void DeclareExchange(String exchangeName, String exchangeClass)
+ {
+ _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
+
+ DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
+ }
+
+ /// <summary>
+ /// Declare a new exchange using the default exchange class.
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ public void DeleteExchange(string exchangeName)
+ {
+ throw new NotImplementedException();
+ }
+
+ /// <summary>
+ /// Declare a new queue with the specified set of arguments.
+ /// </summary>
+ /// <param name="queueName">Name of the queue</param>
+ /// <param name="isDurable">True if the queue should be durable</param>
+ /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+ /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
+ public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
+ {
+ DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
+ }
+
+ /// <summary>
+ /// Delete a queue with the specifies arguments.
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+ /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
+ {
+ DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
+ }
+
+ /// <summary>
+ /// Generate a new Unique name to use for a queue.
+ /// </summary>
+ /// <returns>A unique name to this channel</returns>
+ public string GenerateUniqueName()
+ {
+ string result = _connection.ProtocolSession.GenerateQueueName();
+ return Regex.Replace(result, "[^a-z0-9_]", "_");
+ }
+
+ /// <summary>
+ /// Removes all messages from a queue.
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ public void PurgeQueue(string queueName, bool noWait)
+ {
+ DoPurgeQueue(queueName, noWait);
+ }
+
+ /// <summary>
+ /// Bind a queue to the specified exchange.
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ public void Bind(string queueName, string exchangeName, string routingKey)
+ {
+ DoBind(queueName, exchangeName, routingKey, new FieldTable());
+ }
+
+ /// <summary>
+ /// Bind a queue to the specified exchange.
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
+ public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
+ {
+ DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
+ }
+
+ /// <summary>
+ /// Create a new empty message with no body.
+ /// </summary>
+ /// <returns>The new message</returns>
public IMessage CreateMessage()
{
- // TODO: this is supposed to create a message consisting only of message headers
return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
}
-
+
+ /// <summary>
+ /// Create a new message of the specified MIME type.
+ /// </summary>
+ /// <param name="mimeType">The mime type to create</param>
+ /// <returns>The new message</returns>
public IMessage CreateMessage(string mimeType)
{
- return _messageFactoryRegistry.CreateMessage(mimeType);
+ return _messageFactoryRegistry.CreateMessage(mimeType);
}
+ /// <summary>
+ /// Creates a new message for bytes (application/octet-stream).
+ /// </summary>
+ /// <returns>The new message</returns>
+ public IBytesMessage CreateBytesMessage()
+ {
+ return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream");
+ }
+
+ /// <summary>
+ /// Creates a new text message (text/plain) with empty content.
+ /// </summary>
+ /// <returns>The new message</returns>
public ITextMessage CreateTextMessage()
{
return CreateTextMessage(String.Empty);
}
+ /// <summary>
+ /// Creates a new text message (text/plain) with a body.
+ /// </summary>
+ /// <param name="text">Initial body of the message</param>
+ /// <returns>The new message</returns>
public ITextMessage CreateTextMessage(string text)
{
ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain");
@@ -248,24 +358,92 @@
return msg;
}
- public bool Transacted
+ /// <summary>
+ /// Creates a new Consumer using the builder pattern.
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <returns>The builder object</returns>
+ public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
{
- get
- {
- CheckNotClosed();
- return _transacted;
- }
+ return new MessageConsumerBuilder(this, queueName);
}
- public AcknowledgeMode AcknowledgeMode
+ /// <summary>
+ /// Creates a new consumer.
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <param name="prefetchLow">Low prefetch value</param>
+ /// <param name="prefetchHigh">High prefetch value</param>
+ /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+ /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+ /// <returns>The new consumer</returns>
+ public IMessageConsumer CreateConsumer(string queueName,
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive)
{
- get
- {
- CheckNotClosed();
- return _acknowledgeMode;
- }
+ _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ",
+ queueName, prefetchLow, prefetchHigh, noLocal, exclusive));
+
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive);
+ }
+
+ /// <summary>
+ /// Unsubscribe from a queue.
+ /// </summary>
+ /// <param name="subscriptionName">Subscription name</param>
+ public void Unsubscribe(String name)
+ {
+ throw new NotImplementedException();
}
+ /// <summary>
+ /// Create a new message publisher using the builder pattern.
+ /// </summary>
+ /// <returns>The builder object</returns>
+ public MessagePublisherBuilder CreatePublisherBuilder()
+ {
+ return new MessagePublisherBuilder(this);
+ }
+
+ /// <summary>
+ /// Create a new message publisher.
+ /// </summary>
+ /// <param name="exchangeName">Name of exchange to publish to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="deliveryMode">Default delivery mode</param>
+ /// <param name="timeToLive">Default TTL time of messages</param>
+ /// <param name="immediate">If true, sent immediately</param>
+ /// <param name="mandatory">If true, the broker will return an error
+ /// (as a connection exception) if the message cannot be delivered</param>
+ /// <param name="priority">Default message priority</param>
+ /// <returns>The new message publisher</returns>
+ public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
+ long timeToLive, bool immediate, bool mandatory, int priority)
+ {
+ _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
+ exchangeName, "none", routingKey));
+
+ return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
+ timeToLive, immediate, mandatory, priority);
+ }
+
+ /// <summary>
+ /// Recover after transaction failure.
+ /// </summary>
+ /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks>
+ public void Recover()
+ {
+ CheckNotClosed();
+ CheckNotTransacted();
+
+ throw new NotImplementedException();
+ }
+
+ /// <summary>
+ /// Commit the transaction.
+ /// </summary>
public void Commit()
{
// FIXME: Fail over safety. Needs FailoverSupport?
@@ -291,32 +469,50 @@
}
}
+ /// <summary>
+ /// Rollback the transaction.
+ /// </summary>
public void Rollback()
{
- lock ( _suspensionLock )
- {
- CheckTransacted(); // throws IllegalOperationException if not a transacted session
-
- try
- {
- bool suspended = IsSuspended;
- if ( !suspended )
- Suspend(true);
+ lock (_suspensionLock)
+ {
+ CheckTransacted(); // throws IllegalOperationException if not a transacted session
+
+ try
+ {
+ bool suspended = IsSuspended;
+ if (!suspended)
+ {
+ Suspend(true);
+ }
- // todo: rollback dispatcher when TX support is added
- //if ( _dispatcher != null )
- // _dispatcher.Rollback();
-
- _connection.ConvenientProtocolWriter.SyncWrite(
- TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
-
- if ( !suspended )
- Suspend(false);
- } catch ( AMQException e )
- {
- throw new QpidException("Failed to rollback", e);
- }
- }
+ // todo: rollback dispatcher when TX support is added
+ //if ( _dispatcher != null )
+ // _dispatcher.Rollback();
+
+ _connection.ConvenientProtocolWriter.SyncWrite(
+ TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+
+ if ( !suspended )
+ {
+ Suspend(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new QpidException("Failed to rollback", e);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Create a disconnected channel that will fault
+ /// for most things, but is useful for testing
+ /// </summary>
+ /// <returns>A new disconnected channel</returns>
+ public static IChannel CreateDisconnectedChannel()
+ {
+ return new AmqChannel();
}
public override void Close()
@@ -349,6 +545,56 @@
}
}
+ /**
+ * Called when the server initiates the closure of the session
+ * unilaterally.
+ * @param e the exception that caused this session to be closed. Null causes the
+ */
+ public void ClosedWithException(Exception e)
+ {
+ lock (_connection.FailoverMutex)
+ {
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ SetClosed();
+ AMQException amqe;
+
+ if (e is AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+
+ _connection.DeregisterSession(_channelId);
+ CloseProducersAndConsumers(amqe);
+ }
+ }
+
+ public void MessageReceived(UnprocessedMessage message)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Message received in session with channel id " + _channelId);
+ }
+
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ }
+ else
+ {
+ _queue.Enqueue(message);
+ }
+ }
+
+ public void Dispose()
+ {
+ Close();
+ }
+
private void SetClosed()
{
Interlocked.Exchange(ref _closed, CLOSED);
@@ -378,32 +624,6 @@
}
}
- /**
- * Called when the server initiates the closure of the session
- * unilaterally.
- * @param e the exception that caused this session to be closed. Null causes the
- */
- public void ClosedWithException(Exception e)
- {
- lock (_connection.FailoverMutex)
- {
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- SetClosed();
- AMQException amqe;
- if (e is AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException("Closing session forcibly", e);
- }
- _connection.DeregisterSession(_channelId);
- CloseProducersAndConsumers(amqe);
- }
- }
-
/// <summary>
/// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
/// currently no way of propagating errors to message producers (this is a JMS limitation).
@@ -411,6 +631,7 @@
private void CloseProducers()
{
_logger.Debug("Closing producers on session " + this);
+
// we need to clone the list of producers since the close() method updates the _producers collection
// which would result in a concurrent modification exception
ArrayList clonedProducers = new ArrayList(_producers.Values);
@@ -420,19 +641,20 @@
_logger.Debug("Closing producer " + prod);
prod.Close();
}
+
// at this point the _producers map is empty
}
/// <summary>
/// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
/// <param name="error">not null if this is a result of an error occurring at the connection level</param>
- ///
private void CloseConsumers(Exception error)
{
if (_dispatcher != null)
{
_dispatcher.StopDispatcher();
}
+
// we need to clone the list of consumers since the close() method updates the _consumers collection
// which would result in a concurrent modification exception
ArrayList clonedConsumers = new ArrayList(_consumers.Values);
@@ -448,33 +670,11 @@
con.Close();
}
}
- // at this point the _consumers map will be empty
- }
-
- public void Recover()
- {
- CheckNotClosed();
- CheckNotTransacted(); // throws IllegalOperationException if not a transacted session
-
- // TODO: This cannot be implemented using 0.8 semantics
- throw new NotImplementedException();
- }
-
- public void Run()
- {
- throw new NotImplementedException();
- }
- public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode,
- long timeToLive, bool immediate, bool mandatory, int priority)
- {
- _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}",
- exchangeName, "none", routingKey));
- return CreateProducerImpl(exchangeName, routingKey, deliveryMode,
- timeToLive, immediate, mandatory, priority);
+ // at this point the _consumers map will be empty
}
- public IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
+ private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey,
DeliveryMode deliveryMode,
long timeToLive, bool immediate, bool mandatory, int priority)
{
@@ -496,32 +696,21 @@
}
}
- public IMessageConsumer CreateConsumer(string queueName,
- int prefetchLow,
- int prefetchHigh,
- bool noLocal,
- bool exclusive,
- bool durable,
- string subscriptionName)
- {
- _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5} subscriptionName={6}",
- queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName));
- return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive, durable, subscriptionName);
- }
-
+ /// <summary> Creates a message consumer on this channel.</summary>
+ ///
+ /// <param name="queueName">The name of the queue to attach the consumer to.</param>
+ /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param>
+ /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param>
+ /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param>
+ /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param>
+ ///
+ /// <return>The message consumer.</return>
private IMessageConsumer CreateConsumerImpl(string queueName,
int prefetchLow,
int prefetchHigh,
bool noLocal,
- bool exclusive,
- bool durable,
- string subscriptionName)
+ bool exclusive)
{
- if (durable || subscriptionName != null)
- {
- throw new NotImplementedException(); // TODO: durable subscriptions.
- }
-
lock (_closingLock)
{
CheckNotClosed();
@@ -542,11 +731,6 @@
}
}
- public void Unsubscribe(String name)
- {
- throw new NotImplementedException(); // FIXME
- }
-
private void CheckTransacted()
{
if (!Transacted)
@@ -562,55 +746,7 @@
throw new InvalidOperationException("Channel is transacted");
}
}
-
- public void MessageReceived(UnprocessedMessage message)
- {
- if (_logger.IsDebugEnabled)
- {
- _logger.Debug("Message received in session with channel id " + _channelId);
- }
- if ( message.DeliverBody == null )
- {
- ReturnBouncedMessage(message);
- } else
- {
- _queue.Enqueue(message);
- }
- }
-
- public int DefaultPrefetch
- {
- get { return DefaultPrefetchHigh; }
- }
- public int DefaultPrefetchLow
- {
- get { return _defaultPrefetchLowMark; }
- }
- public int DefaultPrefetchHigh
- {
- get { return _defaultPrefetchHighMark; }
- }
- public bool IsSuspended
- {
- get { return _suspended; }
- }
-
- public ushort ChannelId
- {
- get
- {
- return _channelId;
- }
- }
-
- public AMQConnection Connection
- {
- get
- {
- return _connection;
- }
- }
-
+
internal void Start()
{
_dispatcher = new Dispatcher(this);
@@ -658,11 +794,6 @@
return ++_nextProducerId;
}
- public void Dispose()
- {
- Close();
- }
-
/**
* Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
* failover when the client has veoted resubscription.
@@ -714,11 +845,6 @@
// at this point the _consumers map will be empty
}
- public void PurgeQueue(string queueName, bool noWait)
- {
- DoPurgeQueue(queueName, noWait);
- }
-
private void DoPurgeQueue(string queueName, bool noWait)
{
try
@@ -757,29 +883,19 @@
/// Callers must hold the failover mutex before calling this method.
/// </summary>
/// <param name="consumer"></param>
- void RegisterConsumer(BasicMessageConsumer consumer)
+ private void RegisterConsumer(BasicMessageConsumer consumer)
{
String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal,
- consumer.Exclusive, consumer.AcknowledgeMode);
+ consumer.Exclusive, consumer.AcknowledgeMode);
consumer.ConsumerTag = consumerTag;
_consumers.Add(consumerTag, consumer);
}
- public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args)
- {
- DoBind(queueName, exchangeName, routingKey, (FieldTable)args);
- }
-
- public void Bind(string queueName, string exchangeName, string routingKey)
- {
- DoBind(queueName, exchangeName, routingKey, new FieldTable());
- }
-
internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args)
{
_logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}",
- queueName, exchangeName, routingKey, args));
+ queueName, exchangeName, routingKey, args));
AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0,
queueName, exchangeName,
@@ -798,9 +914,9 @@
String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++);
AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0,
- queueName, tag, noLocal,
- acknowledgeMode == AcknowledgeMode.NoAcknowledge,
- exclusive, true, new FieldTable());
+ queueName, tag, noLocal,
+ acknowledgeMode == AcknowledgeMode.NoAcknowledge,
+ exclusive, true, new FieldTable());
_replayFrames.Add(basicConsume);
@@ -808,34 +924,24 @@
return tag;
}
- public void DeleteExchange(string exchangeName)
- {
- throw new NotImplementedException(); // FIXME
- }
-
- public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
- {
- DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
- }
-
private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
try
{
_logger.Debug(string.Format("DeleteQueue name={0}", queueName));
- AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
- queueName, // queueName
- ifUnused, // IfUnUsed
- ifEmpty, // IfEmpty
- noWait); // NoWait
+ AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait);
_replayFrames.Add(queueDelete);
if (noWait)
+ {
_connection.ProtocolWriter.Write(queueDelete);
+ }
else
+ {
_connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
+ }
}
catch (AMQException)
{
@@ -843,34 +949,12 @@
}
}
- public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
- {
- return new MessageConsumerBuilder(this, queueName);
- }
-
- public MessagePublisherBuilder CreatePublisherBuilder()
- {
- return new MessagePublisherBuilder(this);
- }
-
- public string GenerateUniqueName()
- {
- string result = _connection.ProtocolSession.GenerateQueueName();
- return Regex.Replace(result, "[^a-z0-9_]", "_");
- }
-
- public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
- {
- DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete);
- }
-
private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete)
{
_logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}",
queueName, isDurable, isExclusive, isAutoDelete));
- AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName,
- false, isDurable, isExclusive,
+ AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive,
isAutoDelete, true, null);
_replayFrames.Add(queueDeclare);
@@ -881,23 +965,16 @@
}
}
- public void DeclareExchange(String exchangeName, String exchangeClass)
- {
- _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass));
-
- DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null);
- }
-
// AMQP-level method.
private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName,
string exchangeClass, bool passive, bool durable,
bool autoDelete, bool xinternal, bool noWait, FieldTable args)
{
_logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}",
- _channelId, exchangeName, exchangeClass));
+ _channelId, exchangeName, exchangeClass));
- AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(
- channelId, ticket, exchangeName, exchangeClass, passive, durable, autoDelete, xinternal, noWait, args);
+ AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive,
+ durable, autoDelete, xinternal, noWait, args);
_replayFrames.Add(declareExchange);
@@ -911,7 +988,7 @@
else
{
throw new NotImplementedException("Don't use nowait=false with DeclareExchange");
-// _connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
+ //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody));
}
}
@@ -935,75 +1012,167 @@
_connection.ProtocolWriter.Write(ackFrame);
}
- /// <summary>
- /// Handle a message that bounced from the server, creating
- /// the corresponding exception and notifying the connection about it
- /// </summary>
- /// <param name="message">Unprocessed message</param>
- private void ReturnBouncedMessage(UnprocessedMessage message)
- {
- try
- {
- AbstractQmsMessage bouncedMessage =
- _messageFactoryRegistry.CreateMessage(
- 0, false, message.ContentHeader,
- message.Bodies
- );
-
- int errorCode = message.BounceBody.ReplyCode;
- string reason = message.BounceBody.ReplyText;
- _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
- AMQException exception;
- if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
- {
- exception = new AMQNoConsumersException(reason, bouncedMessage);
- } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
- {
- exception = new AMQNoRouteException(reason, bouncedMessage);
- } else
- {
- exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
- }
- _connection.ExceptionReceived(exception);
- } catch ( Exception ex )
- {
- _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
- }
-
- }
-
- private void OnPrefetchLowMark(int count)
- {
- if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
- {
- _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
- Suspend(false);
- }
- }
- private void OnPrefetchHighMark(int count)
- {
- if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge )
- {
- _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
- Suspend(true);
- }
- }
-
- private void Suspend(bool suspend)
- {
- lock ( _suspensionLock )
- {
- if ( _logger.IsDebugEnabled )
- {
- _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
- }
-
- _suspended = suspend;
- AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
-
- Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
- }
- }
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies);
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+
+ if (errorCode == AMQConstant.NO_CONSUMERS.Code)
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ }
+ else if (errorCode == AMQConstant.NO_ROUTE.Code)
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ }
+ else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+
+ _connection.ExceptionReceived(exception);
+ }
+ catch (Exception ex)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+ }
+
+ private void OnPrefetchLowMark(int count)
+ {
+ if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
+ {
+ _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count);
+ Suspend(false);
+ }
+ }
+
+ private void OnPrefetchHighMark(int count)
+ {
+ if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge)
+ {
+ _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count);
+ Suspend(true);
+ }
+ }
+
+ private void Suspend(bool suspend)
+ {
+ lock (_suspensionLock)
+ {
+ if (_logger.IsDebugEnabled)
+ {
+ _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended"));
+ }
+
+ _suspended = suspend;
+ AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend);
+
+ Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody));
+ }
+ }
+
+ /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers.
+ /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message
+ /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new
+ /// message.
+ ///
+ /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be
+ /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a
+ /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/>
+ /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/>
+ /// </table>
+ /// </summary>
+ ///
+ /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message.
+ /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on
+ /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
+ ///
+ /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+ /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks>
+ private class Dispatcher
+ {
+ /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>
+ private int _stopped = 0;
+
+ /// <summary> The channel for which this is a dispatcher. </summary>
+ private AmqChannel _containingChannel;
+
+ /// <summary> Creates a dispatcher on the specified channel. </summary>
+ ///
+ /// <param name="containingChannel"> The channel on which this is a dispatcher. </param>
+ public Dispatcher(AmqChannel containingChannel)
+ {
+ _containingChannel = containingChannel;
+ }
+
+ /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and
+ /// the connection of bounced messages.</summary>
+ public void RunDispatcher()
+ {
+ UnprocessedMessage message;
+
+ while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null)
+ {
+ if (message.DeliverBody != null)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag];
+
+ if (consumer == null)
+ {
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
+ }
+ else
+ {
+ consumer.NotifyMessage(message, _containingChannel.ChannelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ // Bounced message is processed here, away from the transport thread
+ AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry.
+ CreateMessage(0, false, message.ContentHeader, message.Bodies);
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+ _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
+ }
+ catch (Exception e)
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
+ }
+ }
+ }
+
+ _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + ".");
+ }
+ /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary>
+ public void StopDispatcher()
+ {
+ Interlocked.Exchange(ref _stopped, 1);
+ }
+ }
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Fri Feb 8 02:09:37 2008
@@ -34,9 +34,7 @@
private bool _noLocal;
- /**
- * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
- */
+ /** Holds the exclusive status flag for the consumers access to its queue. */
private bool _exclusive;
public bool Exclusive
@@ -448,6 +446,5 @@
break;
}
}
-
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Closeable.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Closeable.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Closeable.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Closeable.cs Fri Feb 8 02:09:37 2008
@@ -19,29 +19,46 @@
*
*/
using System;
+using Apache.Qpid.Messaging;
namespace Apache.Qpid.Client
{
- public abstract class Closeable
+ /// <summary>Closeable provides monitoring of the state of a closeable resource; whether it is open or closed. It also provides a lock on which
+ /// attempts to close the resource from multiple threads can be coordinated.
+ ///
+ /// <p/><table id="crc"><caption>CRC Card</caption>
+ /// <tr><th> Responsibilities <th> Collaborations
+ /// <tr><td> Close (and clean-up) a resource.
+ /// <tr><td> Monitor the state of a closeable resource.
+ /// <tr><td> Synchronous attempts to close resource from concurrent threads.
+ /// </table>
+ /// </summary>
+ ///
+ /// <remarks>Poor encapsulation of the close lock. Better to completely hide the implementation, such that there is a method, e.g., DoSingleClose,
+ /// that sub-classes implement. Guaranteed to only be called by one thread at once, and iff the object is not already closed. That is, multiple
+ /// simultaneous closes will result in a single call to the real close method. Put the wait and condition checking loop in this base class.
+ /// </remarks>
+ public abstract class Closeable : ICloseable
{
- /// <summary>
- /// Used to ensure orderly closing of the object. The only method that is allowed to be called
- /// from another thread of control is close().
- /// </summary>
+ /// <summary> Constant representing the closed state. </summary>
+ protected const int CLOSED = 1;
+
+ /// <summary> Constant representing the open state. </summary>
+ protected const int NOT_CLOSED = 2;
+
+ /// <summary> Used to ensure orderly closing of the object. </summary>
protected readonly object _closingLock = new object();
- /// <summary>
- /// All access to this field should be using the Inerlocked class, to make it atomic.
- /// Hence it is an int since you cannot use a bool with the Interlocked class.
- /// </summary>
+ /// <summary> Indicates the state of this resource; open or closed. </summary>
protected int _closed = NOT_CLOSED;
- protected const int CLOSED = 1;
- protected const int NOT_CLOSED = 2;
-
/// <summary>
/// Checks the not closed.
/// </summary>
+ ///
+ /// <remarks>Don't like check methods that throw exceptions. a) it can come as a surprise without checked exceptions, b) it limits the
+ /// callers choice, if the caller would prefer a boolean, c) it is not side-effect free programming, where such could be used. Get rid
+ /// of this and replace with boolean.</remarks>
protected void CheckNotClosed()
{
if (_closed == CLOSED)
@@ -50,9 +67,7 @@
}
}
- /// <summary>
- /// Gets a value indicating whether this <see cref="Closeable"/> is closed.
- /// </summary>
+ /// <summary>Indicates whether this resource is closed.</summary>
/// <value><c>true</c> if closed; otherwise, <c>false</c>.</value>
public bool Closed
{
@@ -62,10 +77,7 @@
}
}
- /// <summary>
- /// Close the resource
- /// </summary>
- /// <exception cref="QpidMessagingException">If something goes wrong</exception>
+ /// <summary> Close this resource. </summary>
public abstract void Close();
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Handler/ConnectionStartMethodHandler.cs Fri Feb 8 02:09:37 2008
@@ -76,6 +76,7 @@
clientProperties["product"] = "Apache.Qpid.NET";
clientProperties["version"] = "1.0";
clientProperties["platform"] = GetFullSystemInfo();
+ clientProperties["instance"] = ps.ClientID;
AMQFrame frame = ConnectionStartOkBody.CreateAMQFrame(
evt.ChannelId, clientProperties, selectedMechanism,
saslResponse, selectedLocale);
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs Fri Feb 8 02:09:37 2008
@@ -59,6 +59,8 @@
private AMQConnection _connection;
+ public string ClientID { get { return _connection.ClientID; } }
+
public AMQProtocolSession(IProtocolWriter protocolWriter, IConnectionCloser connectionCloser, AMQConnection connection)
{
_protocolWriter = protocolWriter;
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs Fri Feb 8 02:09:37 2008
@@ -31,7 +31,7 @@
public class AmqpChannel : IProtocolChannel
{
// Warning: don't use this log for regular logging.
- static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
IByteChannel _byteChannel;
IProtocolEncoder _encoder;
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/ProtocolDecoderOutput.cs Fri Feb 8 02:09:37 2008
@@ -35,7 +35,7 @@
internal class ProtocolDecoderOutput : IProtocolDecoderOutput
{
private IProtocolListener _protocolListener;
- static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
+ static readonly ILog _protocolTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ProtocolChannel");
public ProtocolDecoderOutput(IProtocolListener protocolListener)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs Fri Feb 8 02:09:37 2008
@@ -27,7 +27,7 @@
class ByteChannel : IByteChannel
{
// Warning: don't use this log for regular logging.
- private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
+ private static readonly ILog _ioTraceLog = LogManager.GetLogger("TRACE.Qpid.Client.ByteChannel");
private IByteChannel _lowerChannel;
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Properties/AssemblyInfo.cs?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Properties/AssemblyInfo.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Properties/AssemblyInfo.cs Fri Feb 8 02:09:37 2008
@@ -1,4 +1,4 @@
-/*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -24,11 +24,11 @@
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Apache.Qpid.Client")]
-[assembly: AssemblyDescription("Qpid Client API implementation")]
+[assembly: AssemblyDescription("Built from svn revision number: ")]
[assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Apache Qpid")]
+[assembly: AssemblyCompany("Apache Software Foundation")]
[assembly: AssemblyProduct("Apache.Qpid.Client")]
-[assembly: AssemblyCopyright("Copyright (c) 2006 The Apache Software Foundation")]
+[assembly: AssemblyCopyright("Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -49,4 +49,4 @@
//
// You can specify all the values or you can default the Revision and Build Numbers
// by using the '*' as shown below:
-[assembly: AssemblyVersion("0.5.*")]
+[assembly: AssemblyVersion("2.1.0.0")]