You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/10/27 22:26:30 UTC

svn commit: r830346 - /activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs

Author: tabish
Date: Tue Oct 27 21:26:30 2009
New Revision: 830346

URL: http://svn.apache.org/viewvc?rev=830346&view=rev
Log:
Adds async versions of the Client Ack Tests

Modified:
    activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs?rev=830346&r1=830345&r2=830346&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs Tue Oct 27 21:26:30 2009
@@ -29,6 +29,8 @@
 	{
 		protected static string TEST_CLIENT_ID = "TestConsumerClientId";
 
+        private bool dontAck;
+
 // The .NET CF does not have the ability to interrupt threads, so this test is impossible.
 #if !NETCF
 		[RowTest]
@@ -120,7 +122,7 @@
 							{
 								// Kill the thread - otherwise it'll sit in Receive() until a message arrives.
 								receiveThread.Interrupt();
-								Assert.Fail("IMessageConsumer.Receive() thread is still alive, close should have killed it.");
+								Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it.");
 							}
 						}
 					}
@@ -356,6 +358,87 @@
             }
         }
 
+        [Test]
+        public void TestAsyncAckedMessageAreConsumed() 
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {            
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+        
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += new MessageListener(OnMessage);
+        
+                Thread.Sleep(5000);
+        
+                // Reset the session.
+                session.Close();
+        
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+        
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNull(msg);
+        
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose() 
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {            
+                connection.Start();
+                // don't aknowledge message on onMessage() call
+                dontAck = true;
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue("Guid.NewGuid().ToString()");
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+        
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += new MessageListener(OnMessage);
+                // Don't ack the message.
+        
+                // Reset the session. This should cause the Unacked message to be
+                // redelivered.
+                session.Close();
+        
+                Thread.Sleep(5000);
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                Assert.IsNotNull(msg);
+                msg.Acknowledge();
+        
+                session.Close();
+            }
+        }
+    
+        public void OnMessage(IMessage message) 
+        {
+            Assert.IsNotNull(message);
+            
+            if(!dontAck)
+            {
+                try 
+                {
+                    message.Acknowledge();
+                } 
+                catch(Exception)
+                {
+                }    
+            }
+        }
+        
 #endif
 
 	}