You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/18 02:18:27 UTC

svn commit: r539191 - in /incubator/qpid/branches/M2/dotnet: Qpid.Client.Tests/ Qpid.Client.Tests/Channel/ Qpid.Client/ Qpid.Client/Client/ Qpid.Client/Client/Handler/ Qpid.Client/Client/State/ Qpid.Messaging/

Author: tomasr
Date: Thu May 17 17:18:26 2007
New Revision: 539191

URL: http://svn.apache.org/viewvc?view=rev&rev=539191
Log:
QPID-490 (Contributed by Carlos Medina) Implement PurgeQueue and DeleteQueue

Added:
    incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
Modified:
    incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
    incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
    incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs

Added: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Channel/ChannelQueueTest.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,251 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Net;
+using System.Threading;
+using log4net;
+using Qpid.Client.Qms;
+using Qpid.Messaging;
+using NUnit.Framework;
+
+namespace Qpid.Client.Tests.Channel
+{
+    /// <summary>
+    /// Test the queue methods
+    /// </summary>
+    [TestFixture]
+    public class ChannelQueueTest
+    {
+
+        private static ILog _logger = LogManager.GetLogger(typeof(ChannelQueueTest));
+
+        /// <summary> The default AMQ connection URL to use for tests. </summary>
+        const string DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+        const string _routingKey = "ServiceQ1";
+
+        private ExceptionListenerDelegate _exceptionDelegate;
+        private AutoResetEvent _evt = new AutoResetEvent(false);
+        private Exception _lastException = null;
+
+        private IMessageConsumer _consumer;
+        private IMessagePublisher _publisher;
+        private IChannel _channel;
+        private IConnection _connection;
+
+        private string _queueName;
+
+        [SetUp]
+        public virtual void Init()
+        {
+            _logger.Info("public virtual void Init(): called");
+
+            // Create a connection to the broker.
+            IConnectionInfo connectionInfo = QpidConnectionInfo.FromUrl(DEFAULT_URI);
+            _connection = new AMQConnection(connectionInfo);
+            _logger.Info("Starting...");
+
+            // Register this to listen for exceptions on the test connection.
+            _exceptionDelegate = new ExceptionListenerDelegate(OnException);
+            _connection.ExceptionListener += _exceptionDelegate;
+
+            // Establish a session on the broker.
+            _channel = _connection.CreateChannel(false, AcknowledgeMode.AutoAcknowledge, 1);
+
+            // Create a durable, non-temporary, non-exclusive queue.
+            _queueName = _channel.GenerateUniqueName();
+            _channel.DeclareQueue(_queueName, true, false, false);
+
+            _channel.Bind(_queueName, ExchangeNameDefaults.TOPIC, _routingKey);
+
+            // Clear the most recent message and exception.
+            _lastException = null;
+        }
+
+        [TearDown]
+        public virtual void ShutDown()
+        {
+            _logger.Info("public virtual void Shutdown(): called");
+
+            if (_connection != null)
+            {
+                _logger.Info("Disposing connection.");
+                _connection.Dispose();
+                _logger.Info("Connection disposed.");
+            }
+        }
+        
+        [Test]
+        public void DeleteInExistentQueue()
+        {
+            try
+            {
+                _channel.DeleteQueue("Q1", false, false, true);
+                _logger.Info("queue deleted");
+            }
+            catch (AMQException e)
+            {
+                _logger.Info(e.ToString());
+            }
+        }
+
+        [Test]
+        public void DeleteUsedQueue()
+        {
+            // Create the consumer
+            _consumer = _channel.CreateConsumerBuilder(_queueName)
+                    .WithPrefetchLow(100)
+                    .Create();
+            _logger.Info("Consumer was created...");
+
+            // delete the queue
+            _channel.DeleteQueue(_queueName, false, true, true);
+            _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+            Assert.IsNull(_lastException);
+        }
+
+        [Test]
+        public void DeleteUnUsedQueue()
+        {
+            // delete the queue
+            _channel.DeleteQueue(_queueName, true, true, true);
+            _logger.InfoFormat("Queue {0} was delete", _queueName);
+
+            Assert.IsNull(_lastException);
+        }
+
+        [Test]
+        public void DeleteNonEmptyQueue()
+        {
+            // Create the publisher
+            _publisher = _channel.CreatePublisherBuilder()
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_routingKey)
+                .Create();
+            _logger.Info("Publisher created...");
+            SendTestMessage("Message 1");
+
+            try
+            {
+                _channel.DeleteQueue(_queueName, true, false, true);
+            }
+            catch (AMQException)
+            {
+                Assert.Fail("The test fails");
+            }            
+        }
+
+        [Test]
+        public void DeleteEmptyQueue()
+        {
+            // Create the publisher
+            _publisher = _channel.CreatePublisherBuilder()
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_routingKey)
+                .Create();
+            _logger.Info("Publisher created...");
+
+            // delete an empty queue with ifEmpty = true
+            _channel.DeleteQueue(_queueName, false, true, true);
+
+            Assert.IsNull(_lastException);
+        }
+
+        [Test]
+        public void DeleteQueueWithResponse()
+        {
+            // Create the publisher
+            _publisher = _channel.CreatePublisherBuilder()
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_routingKey)
+                .Create();
+            _logger.Info("Publisher created...");
+
+            SendTestMessage("Message 1");
+            SendTestMessage("Message 2");
+            
+            // delete the queue, the server must respond
+            _channel.DeleteQueue(_queueName, false, false, false);
+        }
+
+        [Test]
+        public void PurgeQueueWithResponse()
+        {
+            _publisher = _channel.CreatePublisherBuilder()
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_routingKey)
+                .Create();
+            _logger.Info("Pubisher created");
+
+            SendTestMessage("Message 1");
+            SendTestMessage("Message 2");
+
+            _channel.PurgeQueue(_queueName, false);
+        }
+
+        [Test]
+        public void PurgeQueueWithOutResponse()
+        {
+            _publisher = _channel.CreatePublisherBuilder()
+                .WithExchangeName(ExchangeNameDefaults.TOPIC)
+                .WithRoutingKey(_routingKey)
+                .Create();
+            _logger.Info("Pubisher created");
+
+            SendTestMessage("Message 1");
+            SendTestMessage("Message 2");
+
+            _channel.PurgeQueue(_queueName, true);
+        }
+
+
+        /// <summary>
+        /// Callback method to handle any exceptions raised by the test connection.</summary>        /// 
+        /// <param name="e">The connection exception.</param>
+        public void OnException(Exception e)
+        {
+            // Preserve the most recent exception in case test cases need to examine it.
+            _lastException = e;
+
+            // Notify any waiting threads that an exception event has occurred.
+            _evt.Set();
+        }
+
+        /// <summary>
+        /// Sends the specified message to the test publisher, and confirms that it was received by the test consumer or not
+        /// depending on whether or not the message should be received by the consumer.
+        /// 
+        /// Any exceptions raised by the connection will cause an Assert failure exception to be raised.
+        /// </summary>
+        /// 
+        /// <param name="msgSend">The message to send.</param>
+        private void SendTestMessage(string msg)
+        {
+            // create the IMessage object
+            IMessage msgSend = _channel.CreateTextMessage(msg);
+
+            // send the message
+            _publisher.Send(msgSend);
+            _logger.InfoFormat("The messages \"{0}\" was sent", msg);
+        }
+
+    }
+}

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client.Tests/Qpid.Client.Tests.csproj Thu May 17 17:18:26 2007
@@ -46,6 +46,7 @@
   <ItemGroup>
     <Compile Include="BrokerDetails\BrokerDetailsTest.cs" />
     <Compile Include="Channel\ChannelMessageCreationTests.cs" />
+    <Compile Include="Channel\ChannelQueueTest.cs" />
     <Compile Include="Messages\MessageFactoryRegistryTests.cs" />
     <Compile Include="connection\ConnectionTest.cs" />
     <Compile Include="connection\SslConnectionTest.cs" />

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/AmqChannel.cs Thu May 17 17:18:26 2007
@@ -122,7 +122,7 @@
 
                     if (consumer == null)
                     {
-                        _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a handler - ignoring...");
+                        _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring...");
                     }
                     else
                     {
@@ -673,6 +673,30 @@
             // at this point the _consumers map will be empty
         }
 
+        public void PurgeQueue(string queueName, bool noWait)
+        {
+            DoPurgeQueue(queueName, noWait);
+        }
+
+        private void DoPurgeQueue(string queueName, bool noWait)
+        {
+            try
+            {
+                _logger.DebugFormat("PurgeQueue {0}", queueName);
+
+                AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait);
+
+                if (noWait)
+                    _connection.ProtocolWriter.Write(purgeQueue);
+                else
+                    _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody));
+            }
+            catch (AMQException)
+            {
+                throw;
+            }
+        }
+
         /**
          * Replays frame on fail over.
          * 
@@ -748,9 +772,34 @@
             throw new NotImplementedException(); // FIXME
         }
 
-        public void DeleteQueue()
+        public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
         {
-            throw new NotImplementedException(); // FIXME
+            DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait);
+        }
+
+        private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait)
+        {
+            try
+            {
+                _logger.Debug(string.Format("DeleteQueue name={0}", queueName));
+                
+                AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0,
+                    queueName, // queueName
+                    ifUnused, // IfUnUsed
+                    ifEmpty, // IfEmpty
+                    noWait); // NoWait
+
+                _replayFrames.Add(queueDelete);
+
+                if (noWait)
+                    _connection.ProtocolWriter.Write(queueDelete);
+                else
+                    _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody));
+            }
+            catch (AMQException)
+            {
+                throw;
+            }
         }
 
         public MessageConsumerBuilder CreateConsumerBuilder(string queueName)

Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueueDeleteOkMethodHandler.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+    public class QueueDeleteOkMethodHandler : IStateAwareMethodListener
+    {
+
+        private static readonly ILog _logger = LogManager.GetLogger(typeof(QueueDeleteOkMethodHandler));
+
+        public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+        {
+            QueueDeleteOkBody body = (QueueDeleteOkBody)evt.Method;
+            if (body != null)
+            {
+                _logger.InfoFormat("Received Queue.Delete-Ok message, message count {0}", body.MessageCount);
+            }
+        }
+
+    }
+}

Added: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs?view=auto&rev=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs (added)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/Handler/QueuePurgeOkMethodHandler.cs Thu May 17 17:18:26 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using log4net;
+using Qpid.Client.Message;
+using Qpid.Client.Protocol;
+using Qpid.Client.State;
+using Qpid.Framing;
+
+namespace Qpid.Client.Handler
+{
+    public class QueuePurgeOkMethodHandler : IStateAwareMethodListener
+    {
+
+        private static readonly ILog _logger = LogManager.GetLogger(typeof(QueuePurgeOkMethodHandler));
+
+        public void MethodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
+        {
+            QueuePurgeOkBody body = (QueuePurgeOkBody)evt.Method;
+            if (body != null)
+            {
+                _logger.InfoFormat("Received Queue.Purge-Ok message, message count {0}", body.MessageCount);
+            }
+        }
+
+    }
+}

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Client/State/AMQStateManager.cs Thu May 17 17:18:26 2007
@@ -65,6 +65,8 @@
             IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
             IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
             IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+            IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+            IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
             
             // We need to register a map for the null (i.e. all state) handlers otherwise you get
             // a stack overflow in the handler searching code when you present it with a frame for which
@@ -96,6 +98,8 @@
                 open[typeof(ConnectionCloseBody)] = connectionClose;
                 open[typeof(BasicDeliverBody)] = basicDeliver;
                 open[typeof(BasicReturnBody)] = basicReturn;
+                open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+                open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
                 _state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
             }
             {

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 17 17:18:26 2007
@@ -50,6 +50,8 @@
     <Compile Include="Client\AMQNoConsumersException.cs" />
     <Compile Include="Client\AMQNoRouteException.cs" />
     <Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+    <Compile Include="Client\Handler\QueueDeleteOkMethodHandler.cs" />
+    <Compile Include="Client\Handler\QueuePurgeOkMethodHandler.cs" />
     <Compile Include="Client\SslOptions.cs" />
     <Compile Include="Client\Message\QpidHeaders.cs" />
     <Compile Include="Client\QpidConnectionInfo.cs" />

Modified: incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=539191&r1=539190&r2=539191
==============================================================================
--- incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/branches/M2/dotnet/Qpid.Messaging/IChannel.cs Thu May 17 17:18:26 2007
@@ -68,14 +68,25 @@
         /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
         void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete);
         /// <summary>
-        /// Deletes a queue (todo: fix)
+        /// Delete a queue with the specifies arguments
         /// </summary>
-        void DeleteQueue();
+        /// <param name="queueName">Name of the queue to delete</param>
+        /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+        /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+        /// <param name="noWait">If true, the server will not respond to the method</param>
+        void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait);
         /// <summary>
         /// Generate a new Unique name to use for a queue
         /// </summary>
         /// <returns>A unique name to this channel</returns>
         string GenerateUniqueName();
+
+        /// <summary>
+        /// Removes all messages from a queue
+        /// </summary>
+        /// <param name="queueName">Name of the queue to delete</param>
+        /// <param name="noWait">If true, the server will not respond to the method</param>
+        void PurgeQueue(string queueName, bool noWait);
         
         /// <summary>
         /// Bind a queue to the specified exchange