You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/10/27 22:26:30 UTC
svn commit: r830346 -
/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
Author: tabish
Date: Tue Oct 27 21:26:30 2009
New Revision: 830346
URL: http://svn.apache.org/viewvc?rev=830346&view=rev
Log:
Adds async versions of the Client Ack Tests
Modified:
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs?rev=830346&r1=830345&r2=830346&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs Tue Oct 27 21:26:30 2009
@@ -29,6 +29,8 @@
{
protected static string TEST_CLIENT_ID = "TestConsumerClientId";
+ private bool dontAck;
+
// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
#if !NETCF
[RowTest]
@@ -120,7 +122,7 @@
{
// Kill the thread - otherwise it'll sit in Receive() until a message arrives.
receiveThread.Interrupt();
- Assert.Fail("IMessageConsumer.Receive() thread is still alive, close should have killed it.");
+ Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it.");
}
}
}
@@ -356,6 +358,87 @@
}
}
+ [Test]
+ public void TestAsyncAckedMessageAreConsumed()
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnMessage);
+
+ Thread.Sleep(5000);
+
+ // Reset the session.
+ session.Close();
+
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(queue);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNull(msg);
+
+ session.Close();
+ }
+ }
+
+ [Test]
+ public void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose()
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+ {
+ connection.Start();
+ // don't aknowledge message on onMessage() call
+ dontAck = true;
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IQueue queue = session.GetQueue("Guid.NewGuid().ToString()");
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ consumer.Listener += new MessageListener(OnMessage);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the Unacked message to be
+ // redelivered.
+ session.Close();
+
+ Thread.Sleep(5000);
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(queue);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+ Assert.IsNotNull(msg);
+ msg.Acknowledge();
+
+ session.Close();
+ }
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ Assert.IsNotNull(message);
+
+ if(!dontAck)
+ {
+ try
+ {
+ message.Acknowledge();
+ }
+ catch(Exception)
+ {
+ }
+ }
+ }
+
#endif
}