You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/12/12 19:02:27 UTC
svn commit: r726078 -
/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
Author: jgomes
Date: Fri Dec 12 10:02:26 2008
New Revision: 726078
URL: http://svn.apache.org/viewvc?rev=726078&view=rev
Log:
Modify MessageSelectorTest to delay creation of second consumer to show that first consumer is blocked until second consumer starts consuming.
Modified:
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs?rev=726078&r1=726077&r2=726078&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs Fri Dec 12 10:02:26 2008
@@ -31,8 +31,10 @@
protected const string TOPIC_DESTINATION_NAME = "topic://MessageSelectorTopic";
protected const string TEST_CLIENT_ID = "MessageSelectorClientId";
protected const string TEST_CLIENT_ID2 = "MessageSelectorClientId2";
+ protected const string TEST_CLIENT_ID3 = "MessageSelectorClientId3";
private int receivedNonIgnoredMsgCount = 0;
+ private int receivedIgnoredMsgCount = 0;
#if !NET_1_1
[RowTest]
@@ -45,32 +47,39 @@
{
using(IConnection connection1 = CreateConnection(TEST_CLIENT_ID))
using(IConnection connection2 = CreateConnection(TEST_CLIENT_ID2))
+ using(IConnection connection3 = CreateConnection(TEST_CLIENT_ID3))
{
connection1.Start();
connection2.Start();
+ connection3.Start();
using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession session3 = connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination1 = CreateDestination(session1, destinationName);
- IDestination destination2 = CreateDestination(session2, destinationName);
+ IDestination destination2 = SessionUtil.GetDestination(session2, destinationName);
+ IDestination destination3 = SessionUtil.GetDestination(session2, destinationName);
using(IMessageProducer producer = session1.CreateProducer(destination1))
- using(IMessageConsumer consumer = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
+ using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
{
const int MaxNumRequests = 100000;
int numNonIgnoredMsgsSent = 0;
+ int numIgnoredMsgsSent = 0;
producer.Persistent = persistent;
// producer.RequestTimeout = receiveTimeout;
receivedNonIgnoredMsgCount = 0;
- consumer.Listener += new MessageListener(OnNonIgnoredMessage);
+ receivedIgnoredMsgCount = 0;
+ consumer1.Listener += new MessageListener(OnNonIgnoredMessage);
+ IMessageConsumer consumer2 = null;
for(int index = 1; index <= MaxNumRequests; index++)
{
IMessage request = session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, MaxNumRequests));
- //request.NMSTimeToLive = TimeSpan.FromSeconds(10);
+ // request.NMSTimeToLive = TimeSpan.FromSeconds(10);
if(0 == (index % 2))
{
request.NMSType = "ACTIVE";
@@ -79,16 +88,51 @@
else
{
request.NMSType = "ACTIVE.IGNORE";
+ numIgnoredMsgsSent++;
}
producer.Send(request);
+
+ if(20000 == index)
+ {
+ // Start the second consumer
+ consumer2 = session3.CreateConsumer(destination2, "JMSType LIKE '%IGNORE'");
+ consumer2.Listener += new MessageListener(OnIgnoredMessage);
+ }
}
- while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent)
+ int waitCount = 0;
+ int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+ int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+ while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
+ || receivedIgnoredMsgCount < numIgnoredMsgsSent)
{
+ if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
+ || lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
+ {
+ // Reset the wait count.
+ waitCount = 0;
+ Console.WriteLine("Reset the wait count while we are still receiving msgs.");
+ Thread.Sleep(2000);
+ continue;
+ }
+
+ lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+ lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+ if(waitCount > 60)
+ {
+ Assert.Fail(String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored messages delivered.",
+ receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent));
+ }
+
Console.WriteLine("Waiting to receive all non-ignored messages...");
Thread.Sleep(1000);
+ waitCount++;
}
+
+ consumer2.Dispose();
}
}
}
@@ -99,5 +143,12 @@
receivedNonIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE");
}
+
+ protected void OnIgnoredMessage(IMessage message)
+ {
+ receivedIgnoredMsgCount++;
+ Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
+ Thread.Sleep(100);
+ }
}
}