You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/12/12 19:02:27 UTC

svn commit: r726078 - /activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs

Author: jgomes
Date: Fri Dec 12 10:02:26 2008
New Revision: 726078

URL: http://svn.apache.org/viewvc?rev=726078&view=rev
Log:
Modify MessageSelectorTest to delay creation of second consumer to show that first consumer is blocked until second consumer starts consuming.

Modified:
    activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs?rev=726078&r1=726077&r2=726078&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs Fri Dec 12 10:02:26 2008
@@ -31,8 +31,10 @@
 		protected const string TOPIC_DESTINATION_NAME = "topic://MessageSelectorTopic";
 		protected const string TEST_CLIENT_ID = "MessageSelectorClientId";
 		protected const string TEST_CLIENT_ID2 = "MessageSelectorClientId2";
+		protected const string TEST_CLIENT_ID3 = "MessageSelectorClientId3";
 
 		private int receivedNonIgnoredMsgCount = 0;
+		private int receivedIgnoredMsgCount = 0;
 
 #if !NET_1_1
 		[RowTest]
@@ -45,32 +47,39 @@
 		{
 			using(IConnection connection1 = CreateConnection(TEST_CLIENT_ID))
 			using(IConnection connection2 = CreateConnection(TEST_CLIENT_ID2))
+			using(IConnection connection3 = CreateConnection(TEST_CLIENT_ID3))
 			{
 				connection1.Start();
 				connection2.Start();
+				connection3.Start();
 				using(ISession session1 = connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
 				using(ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
+				using(ISession session3 = connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
 				{
 					IDestination destination1 = CreateDestination(session1, destinationName);
-					IDestination destination2 = CreateDestination(session2, destinationName);
+					IDestination destination2 = SessionUtil.GetDestination(session2, destinationName);
+					IDestination destination3 = SessionUtil.GetDestination(session2, destinationName);
 
 					using(IMessageProducer producer = session1.CreateProducer(destination1))
-					using(IMessageConsumer consumer = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
+					using(IMessageConsumer consumer1 = session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
 					{
 						const int MaxNumRequests = 100000;
 						int numNonIgnoredMsgsSent = 0;
+						int numIgnoredMsgsSent = 0;
 
 						producer.Persistent = persistent;
 						// producer.RequestTimeout = receiveTimeout;
 
 						receivedNonIgnoredMsgCount = 0;
-						consumer.Listener += new MessageListener(OnNonIgnoredMessage);
+						receivedIgnoredMsgCount = 0;
+						consumer1.Listener += new MessageListener(OnNonIgnoredMessage);
+						IMessageConsumer consumer2 = null;
 
 						for(int index = 1; index <= MaxNumRequests; index++)
 						{
 							IMessage request = session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, MaxNumRequests));
 
-							//request.NMSTimeToLive = TimeSpan.FromSeconds(10);
+							// request.NMSTimeToLive = TimeSpan.FromSeconds(10);
 							if(0 == (index % 2))
 							{
 								request.NMSType = "ACTIVE";
@@ -79,16 +88,51 @@
 							else
 							{
 								request.NMSType = "ACTIVE.IGNORE";
+								numIgnoredMsgsSent++;
 							}
 
 							producer.Send(request);
+
+							if(20000 == index)
+							{
+								// Start the second consumer
+								consumer2 = session3.CreateConsumer(destination2, "JMSType LIKE '%IGNORE'");
+								consumer2.Listener += new MessageListener(OnIgnoredMessage);
+							}
 						}
 
-						while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent)
+						int waitCount = 0;
+						int lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+						int lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+						while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
+								|| receivedIgnoredMsgCount < numIgnoredMsgsSent)
 						{
+							if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
+								|| lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
+							{
+								// Reset the wait count.
+								waitCount = 0;
+								Console.WriteLine("Reset the wait count while we are still receiving msgs.");
+								Thread.Sleep(2000);
+								continue;
+							}
+
+							lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+							lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+							if(waitCount > 60)
+							{
+								Assert.Fail(String.Format("Timeout waiting for all messages to be delivered. Only {0} of {1} non-ignored messages delivered.  Only {2} of {3} ignored messages delivered.",
+									receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, numIgnoredMsgsSent));
+							}
+
 							Console.WriteLine("Waiting to receive all non-ignored messages...");
 							Thread.Sleep(1000);
+							waitCount++;
 						}
+
+						consumer2.Dispose();
 					}
 				}
 			}
@@ -99,5 +143,12 @@
 			receivedNonIgnoredMsgCount++;
 			Assert.AreEqual(message.NMSType, "ACTIVE");
 		}
+
+		protected void OnIgnoredMessage(IMessage message)
+		{
+			receivedIgnoredMsgCount++;
+			Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
+			Thread.Sleep(100);
+		}
 	}
 }