You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/11 01:02:48 UTC
svn commit: r537031 - in /incubator/qpid/trunk/qpid: ./ dotnet/
dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/HeadersExchange/
dotnet/Qpid.Client.Tests/Security/ dotnet/Qpid.Client.Tests/connection/
dotnet/Qpid.Client.Tests/requestreply1/ dotnet/Q...
Author: tomasr
Date: Thu May 10 16:02:46 2007
New Revision: 537031
URL: http://svn.apache.org/viewvc?view=rev&rev=537031
Log:
Merged revisions 537015-537026 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r537015 | tomasr | 2007-05-10 17:16:49 -0500 (Thu, 10 May 2007) | 1 line
QPID-435: Fix HeadersExchangeTest
........
r537019 | tomasr | 2007-05-10 17:25:01 -0500 (Thu, 10 May 2007) | 1 line
QPID-441 Fix handling of bounced messages
........
r537026 | tomasr | 2007-05-10 17:46:46 -0500 (Thu, 10 May 2007) | 1 line
QPID-398 SSL support for .NET client
........
Added:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/connection/QpidTestCert.pfx
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/connection/QpidTestCert.pfx
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/connection/SslConnectionTest.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/connection/SslConnectionTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQNoConsumersException.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AMQNoRouteException.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/SslOptions.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/SslOptions.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/IStreamFilter.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/IoHandler.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ISocketConnector.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SocketConnector.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/SslSocketConnector.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/lib/seclib-1.0.0/
- copied from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Common/lib/seclib-1.0.0/
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/lib/seclib-1.0.0/Org.Mentalis.Security.dll
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Common/lib/seclib-1.0.0/Org.Mentalis.Security.dll
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/lib/seclib-1.0.0/seclib-license.txt
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Common/lib/seclib-1.0.0/seclib-license.txt
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Mechanisms/ExternalSaslClientTests.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Sasl.Tests/Mechanisms/ExternalSaslClientTests.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Mechanisms/ExternalSaslClient.cs
- copied unchanged from r537026, incubator/qpid/branches/M2/dotnet/Qpid.Sasl/Mechanisms/ExternalSaslClient.cs
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/dotnet/NOTICE.txt
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/qms/BrokerInfo.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Protocol/AMQConstant.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/SaslTests.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/DefaultClientFactory.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Qpid.Sasl.csproj
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/dotnet/NOTICE.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/NOTICE.txt?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/NOTICE.txt (original)
+++ incubator/qpid/trunk/qpid/dotnet/NOTICE.txt Thu May 10 16:02:46 2007
@@ -20,3 +20,7 @@
Alexei A. Vorontsov or Copyright © 2000-2002 Philip A. Craig. Available
under terms based on the zlib/libpng licence. Available from
http://www.nunit.org/
+
+ - The Mentalis Security Library, Copyright © 2002-2006, , The Mentalis.org Team
+ under tterms based on the BSD license (http://www.mentalis.org/site/license.qpx).
+ Available from http://www.mentalis.org/soft/projects/seclib/
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersExchangeTest.cs Thu May 10 16:02:46 2007
@@ -96,7 +96,7 @@
_consumer = _channel.CreateConsumerBuilder(queueName)
.WithPrefetchLow(100)
.WithPrefetchHigh(500)
- .WithNoLocal(true)
+ .WithNoLocal(false) // make sure we get our own messages
.Create();
// Register this to listen for messages on the consumer.
@@ -188,7 +188,7 @@
SendTestMessage(msg, true);
}
- /// <summary>Check that a message matching only some fields of a headers exhcnage is not passed by the exchange.</summary>
+ /// <summary>Check that a message matching only some fields of a headers exchange is not passed by the exchange.</summary>
[Test]
public void TestMatchOneFails()
{
@@ -258,9 +258,9 @@
{
FieldTable matchTable = new FieldTable();
- // Currently all String matching must be prefixed by an "S" ("S" for string because of a failing of the FieldType definition).
- matchTable["Smatch1"] = "foo";
- matchTable["Smatch2"] = "";
+ matchTable["match1"] = "foo";
+ matchTable["match2"] = "";
+ matchTable["x-match"] = "all";
return matchTable;
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj Thu May 10 16:02:46 2007
@@ -46,6 +46,7 @@
<ItemGroup>
<Compile Include="bio\BlockingIo.cs" />
<Compile Include="connection\ConnectionTest.cs" />
+ <Compile Include="connection\SslConnectionTest.cs" />
<Compile Include="failover\FailoverTest.cs" />
<Compile Include="failover\FailoverTxTest.cs" />
<Compile Include="HeadersExchange\HeadersExchangeTest.cs" />
@@ -86,6 +87,7 @@
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
+ <EmbeddedResource Include="connection\QpidTestCert.pfx" />
<None Include="Qpid.Common.DLL.config">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/Security/CallbackHandlerRegistryTests.cs Thu May 10 16:02:46 2007
@@ -32,7 +32,7 @@
public void ParsesConfiguration()
{
CallbackHandlerRegistry registry = CallbackHandlerRegistry.Instance;
- Assert.AreEqual(3, registry.Mechanisms.Length);
+ Assert.AreEqual(4, registry.Mechanisms.Length);
Assert.Contains("TEST", registry.Mechanisms);
Type handlerType = registry.GetCallbackHandler("TEST");
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs Thu May 10 16:02:46 2007
@@ -40,7 +40,7 @@
private int _expectedMessageCount = NUM_MESSAGES;
- private long _startTime;
+ private long _startTime = 0;
private string _commandQueueName = "ServiceQ1";
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs Thu May 10 16:02:46 2007
@@ -26,69 +26,103 @@
namespace Qpid.Client.Tests
{
- [TestFixture]
- public class UndeliverableTest : BaseMessagingTestFixture
- {
- private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
-
- [SetUp]
- public override void Init()
- {
- base.Init();
-
- try
- {
- _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
- }
- catch (QpidException e)
- {
- _logger.Error("Could not add ExceptionListener", e);
- }
- }
-
- public static void OnException(Exception e)
- {
- // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message.
-
- _logger.Error("OnException handler received connection-level exception", e);
- if (e is QpidException)
- {
- QpidException qe = (QpidException)e;
- if (qe.InnerException is AMQUndeliveredException)
- {
- AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
- _logger.Error("inner exception is AMQUndeliveredException", ue);
- _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
-
- }
- }
- }
-
- [Test]
- public void SendUndeliverableMessage()
- {
- SendOne("default exchange", null);
- SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
- SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
- SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
-
- Thread.Sleep(1000); // Wait for message returns!
- }
-
- private void SendOne(string exchangeNameFriendly, string exchangeName)
- {
- _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
-
- // Send a test message to a non-existant queue on the default exchange. See if message is returned!
- MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
- .WithRoutingKey("Non-existant route key!")
- .WithMandatory(true);
- if (exchangeName != null)
+ /// <summary>
+ /// Tests that when sending undeliverable messages with the
+ /// mandatory flag set, an exception is raised on the connection
+ /// as the message is bounced back by the broker
+ /// </summary>
+ [TestFixture]
+ public class UndeliverableTest : BaseMessagingTestFixture
+ {
+ private static ILog _logger = LogManager.GetLogger(typeof(UndeliverableTest));
+ private ManualResetEvent _event;
+ public const int TIMEOUT = 1000;
+ private Exception _lastException;
+
+ [SetUp]
+ public override void Init()
+ {
+ base.Init();
+ _event = new ManualResetEvent(false);
+ _lastException = null;
+
+ try
+ {
+ _connection.ExceptionListener = new ExceptionListenerDelegate(OnException);
+ } catch ( QpidException e )
+ {
+ _logger.Error("Could not add ExceptionListener", e);
+ }
+ }
+
+ public void OnException(Exception e)
+ {
+ // Here we dig out the AMQUndelivered exception (if present) in order to log the returned message.
+
+ _lastException = e;
+ _logger.Error("OnException handler received connection-level exception", e);
+ if ( e is QpidException )
+ {
+ QpidException qe = (QpidException)e;
+ if ( qe.InnerException is AMQUndeliveredException )
{
- builder.WithExchangeName(exchangeName);
+ AMQUndeliveredException ue = (AMQUndeliveredException)qe.InnerException;
+ _logger.Error("inner exception is AMQUndeliveredException", ue);
+ _logger.Error(string.Format("Returned message = {0}", ue.GetUndeliveredMessage()));
}
- IMessagePublisher publisher = builder.Create();
- publisher.Send(_channel.CreateTextMessage("Hiya!"));
- }
- }
+ }
+ _event.Set();
+ }
+
+ [Test]
+ public void SendUndeliverableMessageOnDefaultExchange()
+ {
+ SendOne("default exchange", null);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnDirectExchange()
+ {
+ SendOne("direct exchange", ExchangeNameDefaults.DIRECT);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnTopicExchange()
+ {
+ SendOne("topic exchange", ExchangeNameDefaults.TOPIC);
+ }
+ [Test]
+ public void SendUndeliverableMessageOnHeadersExchange()
+ {
+ SendOne("headers exchange", ExchangeNameDefaults.HEADERS);
+ }
+
+ private void SendOne(string exchangeNameFriendly, string exchangeName)
+ {
+ _logger.Info("Sending undeliverable message to " + exchangeNameFriendly);
+
+ // Send a test message to a non-existant queue
+ // on the specified exchange. See if message is returned!
+ MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
+ .WithRoutingKey("Non-existant route key!")
+ .WithMandatory(true); // necessary so that the server bounces the message back
+ if ( exchangeName != null )
+ {
+ builder.WithExchangeName(exchangeName);
+ }
+ IMessagePublisher publisher = builder.Create();
+ publisher.Send(_channel.CreateTextMessage("Hiya!"));
+
+ // check we received an exception on the connection
+ // and that it is of the right type
+ _event.WaitOne(TIMEOUT, true);
+
+ Type expectedException = typeof(AMQUndeliveredException);
+ Exception ex = _lastException;
+ Assert.IsNotNull(ex, "No exception was thrown by the test. Expected " + expectedException);
+
+ if ( ex.InnerException != null )
+ ex = ex.InnerException;
+
+ Assert.IsInstanceOfType(expectedException, ex);
+ }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Thu May 10 16:02:46 2007
@@ -672,9 +672,9 @@
}
}
- public bool AttemptReconnection(String host, int port, bool useSSL)
+ public bool AttemptReconnection(String host, int port, SslOptions sslConfig)
{
- IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, useSSL);
+ IBrokerInfo bd = new AmqBrokerInfo("amqp", host, port, sslConfig);
_failoverPolicy.setBroker(bd);
@@ -708,10 +708,10 @@
_transport = LoadTransportFromAssembly(brokerDetail.getHost(), brokerDetail.getPort(), assemblyName, transportType);
*/
- _transport = new BlockingSocketTransport(brokerDetail.Host, brokerDetail.Port, this);
+ _transport = new BlockingSocketTransport();
// Connect.
- _transport.Open();
+ _transport.Connect(brokerDetail, this);
_protocolWriter = new ProtocolWriter(_transport.ProtocolWriter, _protocolListener);
_protocolSession = new AMQProtocolSession(_transport.ProtocolWriter, _transport, this);
_protocolListener.ProtocolSession = _protocolSession;
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqBrokerInfo.cs Thu May 10 16:02:46 2007
@@ -36,6 +36,7 @@
private int _port = 5672;
private string _transport = "amqp";
private Hashtable _options = new Hashtable();
+ private SslOptions _sslOptions;
public AmqBrokerInfo()
{
@@ -182,6 +183,21 @@
}
}
+ public AmqBrokerInfo(string transport, string host, int port, SslOptions sslConfig)
+ : this()
+ {
+ _transport = transport;
+ _host = host;
+ _port = port;
+
+ if ( sslConfig != null )
+ {
+ SetOption(BrokerInfoConstants.OPTIONS_SSL, "true");
+ _sslOptions = sslConfig;
+ }
+ }
+
+
public string Host
{
get { return _host; }
@@ -198,6 +214,11 @@
{
get { return _transport; }
set { _transport = value; }
+ }
+
+ public SslOptions SslOptions
+ {
+ get { return _sslOptions; }
}
public string GetOption(string key)
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May 10 16:02:46 2007
@@ -28,6 +28,7 @@
using Qpid.Collections;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Protocol;
namespace Qpid.Client
{
@@ -568,8 +569,14 @@
if (_logger.IsDebugEnabled)
{
_logger.Debug("Message received in session with channel id " + _channelId);
- }
- _queue.EnqueueBlocking(message);
+ }
+ if ( message.DeliverBody == null )
+ {
+ ReturnBouncedMessage(message);
+ } else
+ {
+ _queue.EnqueueBlocking(message);
+ }
}
public int DefaultPrefetch
@@ -986,5 +993,42 @@
// FIXME: lock FailoverMutex here?
_connection.ProtocolWriter.Write(ackFrame);
}
+
+ /// <summary>
+ /// Handle a message that bounced from the server, creating
+ /// the corresponding exception and notifying the connection about it
+ /// </summary>
+ /// <param name="message">Unprocessed message</param>
+ private void ReturnBouncedMessage(UnprocessedMessage message)
+ {
+ try
+ {
+ AbstractQmsMessage bouncedMessage =
+ _messageFactoryRegistry.CreateMessage(
+ 0, false, message.ContentHeader,
+ message.Bodies
+ );
+
+ int errorCode = message.BounceBody.ReplyCode;
+ string reason = message.BounceBody.ReplyText;
+ _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")");
+ AMQException exception;
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ {
+ exception = new AMQNoConsumersException(reason, bouncedMessage);
+ } else if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ {
+ exception = new AMQNoRouteException(reason, bouncedMessage);
+ } else
+ {
+ exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage);
+ }
+ _connection.ExceptionReceived(exception);
+ } catch ( Exception ex )
+ {
+ _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex);
+ }
+
+ }
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Failover/FailoverHandler.cs Thu May 10 16:02:46 2007
@@ -97,7 +97,8 @@
// if _host has value then we are performing a redirect.
if (_host != null)
{
- failoverSucceeded = _connection.AttemptReconnection(_host, _port, false);
+ // todo: fix SSL support!
+ failoverSucceeded = _connection.AttemptReconnection(_host, _port, null);
}
else
{
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/BasicReturnMethodHandler.cs Thu May 10 16:02:46 2007
@@ -32,7 +32,7 @@
public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
{
- _logger.Debug("New JmsBounce method received");
+ _logger.Debug("New Basic.Return method received");
UnprocessedMessage msg = new UnprocessedMessage();
msg.DeliverBody = null;
msg.BounceBody = (BasicReturnBody) evt.Method;
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Handler/ChannelCloseMethodHandler.cs Thu May 10 16:02:46 2007
@@ -44,11 +44,20 @@
AMQFrame frame = ChannelCloseOkBody.CreateAMQFrame(evt.ChannelId);
evt.ProtocolSession.WriteFrame(frame);
- // HACK
+
if ( errorCode != AMQConstant.REPLY_SUCCESS.Code )
{
- _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
- evt.ProtocolSession.AMQConnection.ExceptionReceived(new AMQChannelClosedException(errorCode, "Error: " + reason));
+ _logger.Debug("Channel close received with errorCode " + errorCode + ", throwing exception");
+ if ( errorCode == AMQConstant.NO_CONSUMERS.Code )
+ throw new AMQNoConsumersException(reason);
+ if ( errorCode == AMQConstant.NO_ROUTE.Code )
+ throw new AMQNoRouteException(reason);
+ if ( errorCode == AMQConstant.INVALID_ARGUMENT.Code )
+ throw new AMQInvalidArgumentException(reason);
+ if ( errorCode == AMQConstant.INVALID_ROUTING_KEY.Code )
+ throw new AMQInvalidRoutingKeyException(reason);
+ // any other
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
evt.ProtocolSession.ChannelClosed(evt.ChannelId, errorCode, reason);
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Protocol/AMQProtocolSession.cs Thu May 10 16:02:46 2007
@@ -271,7 +271,7 @@
id = _queueId++;
}
- return "tmp_" + _connection.Transport.getLocalEndPoint() + "_" + id;
+ return "tmp_" + _connection.Transport.LocalEndpoint + "_" + id;
}
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Security/CallbackHandlerRegistry.cs Thu May 10 16:02:46 2007
@@ -92,6 +92,8 @@
if ( _mechanism2HandlerMap == null )
_mechanism2HandlerMap = new Hashtable();
+ if ( !_mechanism2HandlerMap.Contains(ExternalSaslClient.Mechanism) )
+ _mechanism2HandlerMap.Add(ExternalSaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(CramMD5SaslClient.Mechanism) )
_mechanism2HandlerMap.Add(CramMD5SaslClient.Mechanism, typeof(UsernamePasswordCallbackHandler));
if ( !_mechanism2HandlerMap.Contains(PlainSaslClient.Mechanism) )
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs Thu May 10 16:02:46 2007
@@ -50,19 +50,18 @@
public Queue Read()
{
ByteBuffer buffer = byteChannel.Read();
+ return DecodeAndTrace(buffer);
+ }
+
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return byteChannel.BeginRead(callback, state);
+ }
- Queue frames = Decode(buffer);
-
- // TODO: Refactor to decorator.
- if (_protocolTraceLog.IsDebugEnabled)
- {
- foreach (object o in frames)
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", o));
- }
- }
-
- return frames;
+ public Queue EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = byteChannel.EndRead(result);
+ return DecodeAndTrace(buffer);
}
public void Write(IDataBlock o)
@@ -72,8 +71,31 @@
{
_protocolTraceLog.Debug(String.Format("WRITE {0}", o));
}
-
+ // we should be doing an async write, but apparently
+ // the mentalis library doesn't queue async read/writes
+ // correctly and throws random IOException's. Stay sync for a while
+ //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
byteChannel.Write(Encode(o));
+ }
+
+ private void OnAsyncWriteDone(IAsyncResult result)
+ {
+ byteChannel.EndWrite(result);
+ }
+
+ private Queue DecodeAndTrace(ByteBuffer buffer)
+ {
+ Queue frames = Decode(buffer);
+
+ // TODO: Refactor to decorator.
+ if ( _protocolTraceLog.IsDebugEnabled )
+ {
+ foreach ( object o in frames )
+ {
+ _protocolTraceLog.Debug(String.Format("READ {0}", o));
+ }
+ }
+ return frames;
}
private ByteBuffer Encode(object o)
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IByteChannel.cs Thu May 10 16:02:46 2007
@@ -18,13 +18,54 @@
* under the License.
*
*/
+using System;
using Qpid.Buffer;
namespace Qpid.Client.Transport
{
- public interface IByteChannel
- {
- ByteBuffer Read();
- void Write(ByteBuffer buffer);
- }
+ /// <summary>
+ /// Represents input/output channels that read
+ /// and write <see cref="ByteBuffer"/> instances
+ /// </summary>
+ public interface IByteChannel
+ {
+ /// <summary>
+ /// Read a <see cref="ByteBuffer"/> from the underlying
+ /// network stream and any configured filters
+ /// </summary>
+ /// <returns>A ByteBuffer, if available</returns>
+ ByteBuffer Read();
+ /// <summary>
+ /// Begin an asynchronous read operation
+ /// </summary>
+ /// <param name="callback">Callback method to call when read operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous read operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned from <see cref="BeginRead"/></param>
+ /// <returns>The <see cref="ByteBuffer"/> read</returns>
+ ByteBuffer EndRead(IAsyncResult result);
+ /// <summary>
+ /// Write a <see cref="ByteBuffer"/> to the underlying network
+ /// stream, going through any configured filters
+ /// </summary>
+ /// <param name="buffer"></param>
+ void Write(ByteBuffer buffer);
+ /// <summary>
+ /// Begin an asynchronous write operation
+ /// </summary>
+ /// <param name="buffer">Buffer to write</param>
+ /// <param name="callback">A callback to call when the operation completes</param>
+ /// <param name="state">State object</param>
+ /// <returns>An <see cref="System.IAsyncResult"/> object</returns>
+ IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state);
+ /// <summary>
+ /// End an asynchronous write operation
+ /// </summary>
+ /// <param name="result">The <see cref="System.IAsyncResult"/> object returned by <see cref="BeginWrite"/></param>
+ void EndWrite(IAsyncResult result);
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs Thu May 10 16:02:46 2007
@@ -18,6 +18,7 @@
* under the License.
*
*/
+using System;
using System.Collections;
namespace Qpid.Client.Transport
@@ -25,5 +26,7 @@
public interface IProtocolChannel : IProtocolWriter
{
Queue Read();
+ IAsyncResult BeginRead(AsyncCallback callback, object state);
+ Queue EndRead(IAsyncResult result);
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/ITransport.cs Thu May 10 16:02:46 2007
@@ -18,14 +18,15 @@
* under the License.
*
*/
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
namespace Qpid.Client.Transport
{
public interface ITransport : IConnectionCloser
{
- void Open();
- string getLocalEndPoint();
+ void Connect(IBrokerInfo broker, AMQConnection connection);
+ string LocalEndpoint { get; }
IProtocolWriter ProtocolWriter { get; }
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketProcessor.cs Thu May 10 16:02:46 2007
@@ -1,117 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Net;
-using System.Net.Sockets;
-using log4net;
-using Qpid.Buffer;
-using Qpid.Client.Protocol;
-
-namespace Qpid.Client.Transport.Socket.Blocking
-{
- class BlockingSocketProcessor : IConnectionCloser
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketProcessor));
-
- string _host;
- int _port;
- System.Net.Sockets.Socket _socket;
- private NetworkStream _networkStream;
- IByteChannel _byteChannel;
- IProtocolListener _protocolListener;
-
- public BlockingSocketProcessor(string host, int port, IProtocolListener protocolListener)
- {
- _host = host;
- _port = port;
- _protocolListener = protocolListener;
- _byteChannel = new ByteChannel(this);
- }
-
- /// <summary>
- /// Synchronous blocking connect.
- /// </summary>
- public void Connect()
- {
- _socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
-
- IPHostEntry ipHostInfo = Dns.Resolve(_host); // Note: don't fix this warning. We do this for .NET 1.1 compatibility.
- IPAddress ipAddress = ipHostInfo.AddressList[0];
-
- IPEndPoint ipe = new IPEndPoint(ipAddress, _port);
-
- _socket.Connect(ipe);
- _networkStream = new NetworkStream(_socket, true);
- }
-
- public string getLocalEndPoint()
- {
- return _socket.LocalEndPoint.ToString();
- }
-
- public void Write(ByteBuffer byteBuffer)
- {
- try
- {
- _networkStream.Write(byteBuffer.Array, byteBuffer.Position, byteBuffer.Limit); // FIXME
- }
- catch (Exception e)
- {
- _log.Error("Write caused exception", e);
- _protocolListener.OnException(e);
- }
- }
-
- public ByteBuffer Read()
- {
- const int bufferSize = 4 * 1024; // TODO: Prevent constant allocation of buffers.
- byte[] bytes = new byte[bufferSize];
-
- int numOctets = _networkStream.Read(bytes, 0, bytes.Length);
-
- ByteBuffer byteBuffer = ByteBuffer.Wrap(bytes);
- byteBuffer.Limit = numOctets;
-
- byteBuffer.Flip();
-
- return byteBuffer;
- }
-
- public void Disconnect()
- {
- _networkStream.Flush();
- _networkStream.Close();
- _socket.Close();
- }
-
- public void Close()
- {
- Disconnect();
- }
-
- public IByteChannel ByteChannel
- {
- get { return _byteChannel; }
- }
- }
-}
-
-
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs Thu May 10 16:02:46 2007
@@ -20,101 +20,119 @@
*/
using System;
using System.Collections;
+using System.IO;
using System.Threading;
-using log4net;
+using Qpid.Client.Qms;
using Qpid.Client.Protocol;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
{
- public class BlockingSocketTransport : ITransport
- {
-// static readonly ILog _log = LogManager.GetLogger(typeof(BlockingSocketTransport));
-
- // Configuration variables.
- string _host;
- int _port;
- IProtocolListener _protocolListener;
-
- // Runtime variables.
- private BlockingSocketProcessor _socketProcessor;
- private AmqpChannel _amqpChannel;
-
- private ReaderRunner _readerRunner;
- private Thread _readerThread;
-
- public BlockingSocketTransport(string host, int port, AMQConnection connection)
- {
- _host = host;
- _port = port;
- _protocolListener = connection.ProtocolListener;
- }
-
- public void Open()
- {
- _socketProcessor = new BlockingSocketProcessor(_host, _port, _protocolListener);
- _socketProcessor.Connect();
- _amqpChannel = new AmqpChannel(_socketProcessor.ByteChannel);
- _readerRunner = new ReaderRunner(this);
- _readerThread = new Thread(new ThreadStart(_readerRunner.Run));
- _readerThread.Start();
- }
-
- public string getLocalEndPoint()
- {
- return _socketProcessor.getLocalEndPoint();
- }
-
- public void Close()
- {
- StopReaderThread();
- _socketProcessor.Disconnect();
- }
-
- public IProtocolChannel ProtocolChannel { get { return _amqpChannel; } }
- public IProtocolWriter ProtocolWriter { get { return _amqpChannel; } }
-
- public void StopReaderThread()
- {
- _readerRunner.Stop();
- }
-
- class ReaderRunner
- {
- BlockingSocketTransport _transport;
- bool _running = true;
+ /// <summary>
+ /// TCP Socket transport supporting both
+ /// SSL and non-SSL connections.
+ /// </summary>
+ public class BlockingSocketTransport : ITransport
+ {
+ // Configuration variables.
+ IProtocolListener _protocolListener;
+
+ // Runtime variables.
+ private ISocketConnector _connector;
+ private IoHandler _ioHandler;
+ private AmqpChannel _amqpChannel;
+ private ManualResetEvent _stopEvent;
+
+ public IProtocolWriter ProtocolWriter
+ {
+ get { return _amqpChannel; }
+ }
+ public string LocalEndpoint
+ {
+ get { return _connector.LocalEndpoint; }
+ }
+
+
+ /// <summary>
+ /// Connect to the specified broker
+ /// </summary>
+ /// <param name="broker">The broker to connect to</param>
+ /// <param name="connection">The AMQ connection</param>
+ public void Connect(IBrokerInfo broker, AMQConnection connection)
+ {
+ _stopEvent = new ManualResetEvent(false);
+ _protocolListener = connection.ProtocolListener;
+
+ _ioHandler = MakeBrokerConnection(broker, connection);
+ // todo: get default read size from config!
+
+ _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ // post an initial async read
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
+ }
+
+ /// <summary>
+ /// Close the broker connection
+ /// </summary>
+ public void Close()
+ {
+ StopReading();
+ CloseBrokerConnection();
+ }
+
+ private void StopReading()
+ {
+ _stopEvent.Set();
+ }
+
+ private void CloseBrokerConnection()
+ {
+ if ( _ioHandler != null )
+ {
+ _ioHandler.Dispose();
+ _ioHandler = null;
+ }
+ if ( _connector != null )
+ {
+ _connector.Dispose();
+ _connector = null;
+ }
+ }
+
+ private IoHandler MakeBrokerConnection(IBrokerInfo broker, AMQConnection connection)
+ {
+ if ( broker.UseSSL )
+ {
+ _connector = new SslSocketConnector();
+ } else
+ {
+ _connector = new SocketConnector();
+ }
+
+ Stream stream = _connector.Connect(broker);
+ return new IoHandler(stream, connection.ProtocolListener);
+ }
+
+ private void OnAsyncReadDone(IAsyncResult result)
+ {
+ try
+ {
+ Queue frames = _amqpChannel.EndRead(result);
+ // if we're not stopping, post a read again
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
- public ReaderRunner(BlockingSocketTransport transport)
+ // process results
+ foreach ( IDataBlock dataBlock in frames )
{
- _transport = transport;
+ _protocolListener.OnMessage(dataBlock);
}
-
- public void Run()
- {
- try
- {
- while (_running)
- {
- Queue frames = _transport.ProtocolChannel.Read();
-
- foreach (IDataBlock dataBlock in frames)
- {
- _transport._protocolListener.OnMessage(dataBlock);
- }
- }
- }
- catch (Exception e)
- {
- _transport._protocolListener.OnException(e);
- }
- }
-
- public void Stop()
- {
- // TODO: Check if this is thread safe. running is not volitile....
- _running = false;
- }
- }
- }
+ } catch ( Exception e )
+ {
+ _protocolListener.OnException(e);
+ }
+ }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/ByteChannel.cs Thu May 10 16:02:46 2007
@@ -29,16 +29,16 @@
// Warning: don't use this log for regular logging.
private static readonly ILog _ioTraceLog = LogManager.GetLogger("Qpid.Client.ByteChannel.Tracing");
- BlockingSocketProcessor processor;
+ private IByteChannel _lowerChannel;
- public ByteChannel(BlockingSocketProcessor processor)
+ public ByteChannel(IByteChannel lowerChannel)
{
- this.processor = processor;
+ _lowerChannel = lowerChannel;
}
public ByteBuffer Read()
{
- ByteBuffer result = processor.Read();
+ ByteBuffer result = _lowerChannel.Read();
// TODO: Move into decorator.
if (_ioTraceLog.IsDebugEnabled)
@@ -49,6 +49,21 @@
return result;
}
+ public IAsyncResult BeginRead(AsyncCallback callback, object state)
+ {
+ return _lowerChannel.BeginRead(callback, state);
+ }
+
+ public ByteBuffer EndRead(IAsyncResult result)
+ {
+ ByteBuffer buffer = _lowerChannel.EndRead(result);
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("READ {0}", buffer));
+ }
+ return buffer;
+ }
+
public void Write(ByteBuffer buffer)
{
// TODO: Move into decorator.
@@ -56,8 +71,22 @@
{
_ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
}
-
- processor.Write(buffer);
+
+ _lowerChannel.Write(buffer);
+ }
+
+ public IAsyncResult BeginWrite(ByteBuffer buffer, AsyncCallback callback, object state)
+ {
+ if ( _ioTraceLog.IsDebugEnabled )
+ {
+ _ioTraceLog.Debug(String.Format("WRITE {0}", buffer));
+ }
+ return _lowerChannel.BeginWrite(buffer, callback, state);
+ }
+
+ public void EndWrite(IAsyncResult result)
+ {
+ _lowerChannel.EndWrite(result);
}
- }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 10 16:02:46 2007
@@ -32,6 +32,10 @@
<SpecificVersion>False</SpecificVersion>
<HintPath>..\Qpid.Common\lib\log4net\log4net.dll</HintPath>
</Reference>
+ <Reference Include="Org.Mentalis.Security, Version=1.0.13.716, Culture=neutral, PublicKeyToken=085a8f6006888436">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Qpid.Common\lib\seclib-1.0.0\Org.Mentalis.Security.dll</HintPath>
+ </Reference>
<Reference Include="System" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
@@ -43,7 +47,10 @@
<Compile Include="Client\AMQDestination.cs" />
<Compile Include="Client\AmqChannel.cs" />
<Compile Include="Client\AMQAuthenticationException.cs" />
+ <Compile Include="Client\AMQNoConsumersException.cs" />
+ <Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+ <Compile Include="Client\SslOptions.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
<Compile Include="Client\BasicMessageConsumer.cs" />
@@ -97,14 +104,18 @@
<Compile Include="Client\State\StateWaiter.cs" />
<Compile Include="Client\Transport\AmqpChannel.cs" />
<Compile Include="Client\Transport\AMQProtocolProvider.cs" />
+ <Compile Include="Client\Transport\IStreamFilter.cs" />
+ <Compile Include="Client\Transport\IoHandler.cs" />
<Compile Include="Client\Transport\IByteChannel.cs" />
<Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
- <Compile Include="Client\Transport\Socket\Blocking\BlockingSocketProcessor.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\SslSocketConnector.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\SocketConnector.cs" />
+ <Compile Include="Client\Transport\Socket\Blocking\ISocketConnector.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="qms\BrokerInfo.cs" />
<Compile Include="qms\ConnectionInfo.cs" />
@@ -144,4 +155,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/qms/BrokerInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/qms/BrokerInfo.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/qms/BrokerInfo.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/qms/BrokerInfo.cs Thu May 10 16:02:46 2007
@@ -24,7 +24,7 @@
{
/// <summary>
/// Know URL option names.
- /// <seealso cref="ConnectionInfo"/>
+ /// <seealso cref="IConnectionInfo"/>
/// </summary>
public class BrokerInfoConstants
{
@@ -47,6 +47,7 @@
string Transport { get; set; }
bool UseSSL { get; set; }
long Timeout { get; set; }
+ SslOptions SslOptions { get; }
String GetOption(string key);
void SetOption(string key, string value);
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Protocol/AMQConstant.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Protocol/AMQConstant.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Protocol/AMQConstant.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Protocol/AMQConstant.cs Thu May 10 16:02:46 2007
@@ -77,11 +77,14 @@
public static readonly AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
public static readonly AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
public static readonly AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
- public static readonly AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
- public static readonly AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
public static readonly AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
public static readonly AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
public static readonly AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+ public static readonly AMQConstant ALREADY_EXISTS = new AMQConstant(405, "already exists", true);
+ public static readonly AMQConstant IN_USE = new AMQConstant(406, "in use", true);
+ public static readonly AMQConstant INVALID_ROUTING_KEY = new AMQConstant(407, "routing key invalid", true);
+ public static readonly AMQConstant REQUEST_TIMEOUT = new AMQConstant(408, "request timeout", true);
+ public static readonly AMQConstant INVALID_ARGUMENT = new AMQConstant(409, "argument invalid", true);
public static readonly AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
public static readonly AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
public static readonly AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Qpid.Common.csproj Thu May 10 16:02:46 2007
@@ -44,6 +44,8 @@
<Compile Include="AMQConnectionClosedException.cs" />
<Compile Include="AMQDisconnectedException.cs" />
<Compile Include="AMQException.cs" />
+ <Compile Include="AMQInvalidArgumentException.cs" />
+ <Compile Include="AMQInvalidRoutingKeyException.cs" />
<Compile Include="AMQUndeliveredException.cs" />
<Compile Include="AssemblySettings.cs" />
<Compile Include="Collections\LinkedHashtable.cs" />
@@ -208,4 +210,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/Qpid.Sasl.Tests.csproj Thu May 10 16:02:46 2007
@@ -38,6 +38,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
+ <Compile Include="Mechanisms\ExternalSaslClientTests.cs" />
<Compile Include="TestClientFactory.cs" />
<Compile Include="Mechanisms\AnonymousSaslClientTests.cs" />
<Compile Include="Mechanisms\DigestSaslClientTests.cs" />
@@ -63,4 +64,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/SaslTests.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/SaslTests.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/SaslTests.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl.Tests/SaslTests.cs Thu May 10 16:02:46 2007
@@ -78,6 +78,17 @@
}
[Test]
+ public void CanCreateExternal()
+ {
+ Hashtable props = new Hashtable();
+ string[] mechanisms = new string[] { "EXTERNAL", "OTHER" };
+ ISaslClient client = Sasl.CreateClient(mechanisms, "", "", "", props, this);
+
+ Assert.IsNotNull(client);
+ Assert.IsInstanceOfType(typeof(ExternalSaslClient), client);
+ }
+
+ [Test]
public void ReturnsNullIfNoFactoryFound()
{
Hashtable props = new Hashtable();
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/DefaultClientFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/DefaultClientFactory.cs?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/DefaultClientFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/DefaultClientFactory.cs Thu May 10 16:02:46 2007
@@ -21,7 +21,6 @@
using System;
using System.Collections;
-using System.Text;
using Qpid.Sasl.Mechanisms;
@@ -29,11 +28,12 @@
{
public class DefaultClientFactory : ISaslClientFactory
{
- private static readonly string[] SUPPORTED = new string[] {
+ private static readonly string[] SUPPORTED = new string[] {
DigestSaslClient.Mechanism,
CramMD5SaslClient.Mechanism,
PlainSaslClient.Mechanism,
AnonymousSaslClient.Mechanism,
+ ExternalSaslClient.Mechanism,
};
public string[] GetSupportedMechanisms(IDictionary props)
@@ -52,6 +52,7 @@
vetoed.Add(CramMD5SaslClient.Mechanism);
vetoed.Add(PlainSaslClient.Mechanism);
vetoed.Add(AnonymousSaslClient.Mechanism);
+ vetoed.Add(ExternalSaslClient.Mechanism);
}
if ( props.Contains(SaslProperties.PolicyNoAnonymous) )
{
@@ -85,6 +86,8 @@
return new AnonymousSaslClient(authorizationId, props, handler);
case DigestSaslClient.Mechanism:
return new DigestSaslClient(authorizationId, serverName, protocol, props, handler);
+ case ExternalSaslClient.Mechanism:
+ return new ExternalSaslClient(authorizationId, props, handler);
}
}
// unknown mechanism
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Qpid.Sasl.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Qpid.Sasl.csproj?view=diff&rev=537031&r1=537030&r2=537031
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Qpid.Sasl.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Sasl/Qpid.Sasl.csproj Thu May 10 16:02:46 2007
@@ -38,6 +38,7 @@
<Compile Include="Configuration\SaslConfiguration.cs" />
<Compile Include="Configuration\SaslConfigurationSectionHandler.cs" />
<Compile Include="MD5HMAC.cs" />
+ <Compile Include="Mechanisms\ExternalSaslClient.cs" />
<Compile Include="SaslException.cs" />
<Compile Include="Mechanisms\AnonymousSaslClient.cs" />
<Compile Include="Mechanisms\DigestSaslClient.cs" />
@@ -60,4 +61,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
\ No newline at end of file