You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/18 02:18:27 UTC
svn commit: r539191 - in /incubator/qpid/branches/M2/dotnet:
Qpid.Client.Tests/ Qpid.Client.Tests/Channel/ Qpid.Client/
Qpid.Client/Client/ Qpid.Client/Client/Handler/ Qpid.Client/Client/State/
Qpid.Messaging/
Author: tomasr
Date: Thu May 17 17:18:26 2007
New Revision: 539191
URL: http://svn.apache.org/viewvc?view=rev&rev=539191
Log:
QPID-490 (Contributed by Carlos Medina) Implement PurgeQueue and DeleteQueue
Added:
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
Modified:
incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
Added: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using System.Threading;
+using log4net;
+using Qpid.Client.Qms;
+using Qpid.Messaging;
+using NUnit.Framework;
+
+namespace Qpid.Client.Tests.Channel
+{
+ /// <summary>
+ /// Test the queue methods
+ /// </summary>
+ [TestFixture]
+ public class ChannelQueueTest
+ {
+
+ private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
+
+ /// <summary> The default AMQ connection URL to use for tests. </summary>
+ const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+ const string _routingKey = "ServiceQ1";
+
+ private ExceptionListenerDelegate _exceptionDelegate;
+ private AutoResetEvent _evt = new AutoResetEvent(false);
+ private Exception _lastException = null;
+
+ private IMessageConsumer _consumer;
+ private IMessagePublisher _publisher;
+ private IChannel _channel;
+ private IConnection _connection;
+
+ private string _queueName;
+
+ [SetUp]
+ public virtual void Init()
+ {
+ _logger.Info("public virtual void Init(): called");
+
+ // Create a connection to the broker.
+ IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
+ _connection = new AMQConnection(connectionInfo);
+ _logger.Info("Starting...");
+
+ // Register this to listen for exceptions on the test connection.
+ _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+ _connection.ExceptionListener += _exceptionDelegate;
+
+ // Establish a session on the broker.
+ _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+ // Create a durable, non-temporary, non-exclusive queue.
+ _queueName = _channel.GenerateUniqueName();
+ _channel.DeclareQueue(_queueName, true, false, false);
+
+ _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
+
+ // Clear the most recent message and exception.
+ _lastException = null;
+ }
+
+ [TearDown]
+ public virtual void ShutDown()
+ {
+ _logger.Info("public virtual void Shutdown(): called");
+
+ if (_connection != null)
+ {
+ _logger.Info("Disposing connection.");
+ _connection.Dispose();
+ _logger.Info("Connection disposed.");
+ }
+ }
+
+ [Test]
+ public void DeleteInExistentQueue()
+ {
+ try
+ {
+ _channel.DeleteQueue("Q1", false, false, true);
+ _logger.Info("queue deleted");
+ }
+ catch (AMQException e)
+ {
+ _logger.Info(e.ToString());
+ }
+ }
+
+ [Test]
+ public void DeleteUsedQueue()
+ {
+ // Create the consumer
+ _consumer = _channel.CreateConsumerBuilder(_queueName)
+ .WithPrefetchLow(100)
+ .Create();
+ _logger.Info("Consumer was created...");
+
+ // delete the queue
+ _channel.DeleteQueue(_queueName, false, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteUnUsedQueue()
+ {
+ // delete the queue
+ _channel.DeleteQueue(_queueName, true, true, true);
+ _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteNonEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+ SendTestMessage("Message 1");
+
+ try
+ {
+ _channel.DeleteQueue(_queueName, true, false, true);
+ }
+ catch (AMQException)
+ {
+ Assert.Fail("The test fails");
+ }
+ }
+
+ [Test]
+ public void DeleteEmptyQueue()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ // delete an empty queue with ifEmpty = true
+ _channel.DeleteQueue(_queueName, false, true, true);
+
+ Assert.IsNull(_lastException);
+ }
+
+ [Test]
+ public void DeleteQueueWithResponse()
+ {
+ // Create the publisher
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Publisher created...");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ // delete the queue, the server must respond
+ _channel.DeleteQueue(_queueName, false, false, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, false);
+ }
+
+ [Test]
+ public void PurgeQueueWithOutResponse()
+ {
+ _publisher = _channel.CreatePublisherBuilder()
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_routingKey)
+ .Create();
+ _logger.Info("Pubisher created");
+
+ SendTestMessage("Message 1");
+ SendTestMessage("Message 2");
+
+ _channel.PurgeQueue(_queueName, true);
+ }
+
+
+ /// <summary>
+ /// Callback method to handle any exceptions raised by the test connection.</summary> ///
+ /// <param name="e">The connection exception.</param>
+ public void OnException(Exception e)
+ {
+ // Preserve the most recent exception in case test cases need to examine it.
+ _lastException = e;
+
+ // Notify any waiting threads that an exception event has occurred.
+ _evt.Set();
+ }
+
+ /// <summary>
+ /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+ /// depending on whether or not the message should be received by the consumer.
+ ///
+ /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+ /// </summary>
+ ///
+ /// <param name="msgSend">The message to send.</param>
+ private void SendTestMessage(string msg)
+ {
+ // create the IMessage object
+ IMessage msgSend = _channel.CreateTextMessage(msg);
+
+ // send the message
+ _publisher.Send(msgSend);
+ _logger.InfoFormat("The messages \"{0}\" was sent", msg);
+ }
+
+ }
+}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj Thu May 17 17:18:26 2007
@@ -46,6 +46,7 @@
<ItemGroup>
<Compile Include="BrokerDetails\BrokerDetailsTest.cs" />
<Compile Include="Channel\ChannelMessageCreationTests.cs" />
+ <Compile Include="Channel\ChannelQueueTest.cs" />
<Compile Include="Messages\MessageFactoryRegistryTests.cs" />
<Compile Include="connection\ConnectionTest.cs" />
<Compile Include="connection\SslConnectionTest.cs" />
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May 17 17:18:26 2007
@@ -122,7 +122,7 @@
if (consumer == null)
{
- _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+ _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
}
else
{
@@ -673,6 +673,30 @@
// at this point the _consumers map will be empty
}
+ public void PurgeQueue(string queueName, bool noWait)
+ {
+ DoPurgeQueue(queueName, noWait);
+ }
+
+ private void DoPurgeQueue(string queueName, bool noWait)
+ {
+ try
+ {
+ _logger.DebugFormat("PurgeQueue {0}", queueName);
+
+ AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+
+ if (noWait)
+ _connection.ProtocolWriter.Write(purgeQueue);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
+ }
+
/**
* Replays frame on fail over.
*
@@ -748,9 +772,34 @@
throw new NotImplementedException(); // FIXME
}
- public void DeleteQueue()
+ public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
{
- throw new NotImplementedException(); // FIXME
+ DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
+ }
+
+ private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
+ {
+ try
+ {
+ _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+
+ AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
+ queueName, // queueName
+ ifUnused, // IfUnUsed
+ ifEmpty, // IfEmpty
+ noWait); // NoWait
+
+ _replayFrames.Add(queueDelete);
+
+ if (noWait)
+ _connection.ProtocolWriter.Write(queueDelete);
+ else
+ _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
+ }
+ catch (AMQException)
+ {
+ throw;
+ }
}
public MessageConsumerBuilder CreateConsumerBuilder(string queueName)
Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+ public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
+ {
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
+
+ public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+ {
+ QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
+ if (body != null)
+ {
+ _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
+ }
+ }
+
+ }
+}
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs Thu May 17 17:18:26 2007
@@ -65,6 +65,8 @@
IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+ IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+ IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
// We need to register a map for the null (i.e. all state) handlers otherwise you get
// a stack overflow in the handler searching code when you present it with a frame for which
@@ -96,6 +98,8 @@
open[typeof(ConnectionCloseBody)] = connectionClose;
open[typeof(BasicDeliverBody)] = basicDeliver;
open[typeof(BasicReturnBody)] = basicReturn;
+ open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+ open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
_state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
}
{
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 17 17:18:26 2007
@@ -50,6 +50,8 @@
<Compile Include="Client\AMQNoConsumersException.cs" />
<Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+ <Compile Include="Client\Handler\QueueDeleteOkMethodHandler.cs" />
+ <Compile Include="Client\Handler\QueuePurgeOkMethodHandler.cs" />
<Compile Include="Client\SslOptions.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs Thu May 17 17:18:26 2007
@@ -68,14 +68,25 @@
/// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete);
/// <summary>
- /// Deletes a queue (todo: fix)
+ /// Delete a queue with the specifies arguments
/// </summary>
- void DeleteQueue();
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+ /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait);
/// <summary>
/// Generate a new Unique name to use for a queue
/// </summary>
/// <returns>A unique name to this channel</returns>
string GenerateUniqueName();
+
+ /// <summary>
+ /// Removes all messages from a queue
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void PurgeQueue(string queueName, bool noWait);
/// <summary>
/// Bind a queue to the specified exchange