You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/07 20:40:17 UTC
svn commit: r896980 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk:
./ src/test/csharp/
Author: tabish
Date: Thu Jan 7 19:40:17 2010
New Revision: 896980
URL: http://svn.apache.org/viewvc?rev=896980&view=rev
Log:
Update the IndividualAckTest and add more Transaction Tests
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs?rev=896980&r1=896979&r2=896980&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs Thu Jan 7 19:40:17 2010
@@ -73,7 +73,7 @@
session.Close();
}
- //[Test]
+ [Test]
public void TestLastMessageAcked()
{
ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
@@ -88,11 +88,11 @@
// Consume the message...
IMessageConsumer consumer = session.CreateConsumer(queue);
- IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNotNull(msg);
- msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNotNull(msg);
- msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNotNull(msg);
msg.Acknowledge();
@@ -102,13 +102,13 @@
// Attempt to Consume the message...
consumer = session.CreateConsumer(queue);
- msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNotNull(msg);
- Assert.AreEqual(msg1,msg);
- msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.AreEqual(msg1.Text, msg.Text);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNotNull(msg);
- Assert.AreEqual(msg2,msg);
- msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.AreEqual(msg2.Text, msg.Text);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
Assert.IsNull(msg);
session.Close();
}
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs Thu Jan 7 19:40:17 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+ [TestFixture]
+ public class StompQueueTransactionTest : StompTransactionTestSupport
+ {
+ public const String CLIENT_ID = "QueueTransactionTest";
+ public const String DESTINATION_NAME = "QueueTransactionTestDestination";
+
+ protected override bool Topic
+ {
+ get { return false; }
+ }
+
+ protected override String TestClientId
+ {
+ get { return CLIENT_ID + ":" + new Random().Next(); }
+ }
+
+ protected override String Subscription
+ {
+ get { return null; }
+ }
+
+ protected override String DestinationName
+ {
+ get { return DESTINATION_NAME; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs Thu Jan 7 19:40:17 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+ [TestFixture]
+ public class StompTopicTransactionTest : StompTransactionTestSupport
+ {
+ public const String CLIENT_ID = "TopicTransactionTest";
+ public const String DESTINATION_NAME = "StompTopicTransactionTestDestination";
+ public const String SUBSCRIPTION_NAME = "TopicTransactionTest";
+
+ protected override bool Topic
+ {
+ get { return true; }
+ }
+
+ protected override String TestClientId
+ {
+ get { return CLIENT_ID; }
+ }
+
+ protected override String Subscription
+ {
+ get { return SUBSCRIPTION_NAME; }
+ }
+
+ protected override String DestinationName
+ {
+ get { return DESTINATION_NAME; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs Thu Jan 7 19:40:17 2010
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using Apache.NMS.Stomp.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+ public abstract class StompTransactionTestSupport : NMSTestSupport
+ {
+ private const int MESSAGE_COUNT = 5;
+ private const string MESSAGE_TEXT = "message";
+
+ private IConnection connection;
+ private ISession session;
+ private IMessageConsumer consumer;
+ private IMessageProducer producer;
+ private IDestination destination;
+
+ private int batchCount = 10;
+ private int batchSize = 20;
+
+ // for message listener test
+ private LinkedList<IMessage> unackMessages = new LinkedList<IMessage>();
+ private LinkedList<IMessage> ackMessages = new LinkedList<IMessage>();
+ private bool resendPhase;
+
+ [SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ this.resendPhase = false;
+
+ Reconnect();
+ PurgeDestination();
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ this.session.Close();
+ this.session = null;
+ this.connection.Close();
+ this.connection = null;
+
+ this.unackMessages.Clear();
+ this.ackMessages.Clear();
+
+ base.TearDown();
+
+ Thread.Sleep(500);
+ }
+
+ protected abstract bool Topic
+ {
+ get;
+ }
+
+ protected abstract String TestClientId
+ {
+ get;
+ }
+
+ protected abstract String Subscription
+ {
+ get;
+ }
+
+ protected abstract String DestinationName
+ {
+ get;
+ }
+
+ public override IConnection CreateConnection()
+ {
+ return CreateConnection(this.TestClientId);
+ }
+
+ protected void BeginTx()
+ {
+ }
+
+ protected void CommitTx()
+ {
+ session.Commit();
+ }
+
+ protected void RollbackTx()
+ {
+ session.Rollback();
+ }
+
+ [Test]
+ public void TestSendReceiveTransactedBatches()
+ {
+ ITextMessage message = session.CreateTextMessage("Batch IMessage");
+
+ for(int j = 0; j < batchCount; j++)
+ {
+ BeginTx();
+
+ for(int i = 0; i < batchSize; i++)
+ {
+ producer.Send(message);
+ }
+
+ CommitTx();
+
+ BeginTx();
+ for(int i = 0; i < batchSize; i++)
+ {
+ message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Received only " + i + " messages in batch " + j);
+ Assert.AreEqual("Batch IMessage", message.Text);
+ }
+
+ CommitTx();
+ }
+ }
+
+ [Test]
+ public void TestSendRollback()
+ {
+ IMessage[] outbound = new IMessage[]
+ {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ RollbackTx();
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ BeginTx();
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendSessionClose()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ consumer.Close();
+
+ ReconnectSession();
+
+ // sends a message
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendSessionAndConnectionClose()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ consumer.Close();
+ session.Close();
+
+ Reconnect();
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestReceiveRollback()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ // sent both messages
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+ AssertEquals(outbound[0], message);
+ CommitTx();
+
+ // Rollback so we can get that last message again.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ AssertEquals(outbound[1], message);
+ RollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the message again!");
+ messages.AddLast(message);
+ CommitTx();
+
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestReceiveTwoThenRollback()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ AssertEquals(outbound[0], message);
+
+ message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ AssertEquals(outbound[1], message);
+ RollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the first message again!");
+ messages.AddLast(message);
+ AssertEquals(outbound[0], message);
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the first message again!");
+ messages.AddLast(message);
+ AssertEquals(outbound[1], message);
+
+ Assert.IsNull(consumer.ReceiveNoWait());
+ CommitTx();
+
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendReceiveWithPrefetchOne() {
+ SetPrefetchToOne();
+ ReconnectSession();
+
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage"),
+ session.CreateTextMessage("Third IMessage"),
+ session.CreateTextMessage("Fourth IMessage")};
+
+ BeginTx();
+ for(int i = 0; i < outbound.Length; i++)
+ {
+ // sends a message
+ producer.Send(outbound[i]);
+ }
+ CommitTx();
+
+ // receives the first message
+ BeginTx();
+
+ for(int i = 0; i < outbound.Length; i++)
+ {
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ }
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ }
+
+ [Test]
+ public void TestReceiveTwoThenRollbackManyTimes()
+ {
+ for(int i = 0; i < 5; i++)
+ {
+ TestReceiveTwoThenRollback();
+ }
+ }
+
+ [Test]
+ public void TestSendRollbackWithPrefetchOfOne()
+ {
+ SetPrefetchToOne();
+ TestSendRollback();
+ }
+
+ [Test]
+ public void TestReceiveRollbackWithPrefetchOfOne()
+ {
+ SetPrefetchToOne();
+ TestReceiveRollback();
+ }
+
+ [Test]
+ public void TestCloseConsumerBeforeCommit()
+ {
+ ITextMessage[] outbound = new ITextMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ // sends the messages
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ BeginTx();
+ ITextMessage message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.AreEqual(outbound[0].Text, message.Text);
+ // Close the consumer before the Commit. This should not cause the
+ // received message to Rollback.
+ consumer.Close();
+ CommitTx();
+
+ // Create a new consumer
+ consumer = CreateMessageConsumer();
+
+ BeginTx();
+ message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ Assert.AreEqual(outbound[1].Text, message.Text);
+ CommitTx();
+ }
+
+ protected void Reconnect()
+ {
+ if(this.connection != null)
+ {
+ // Close the prev connection.
+ this.connection.Close();
+ this.connection = null;
+ }
+
+ Thread.Sleep(500);
+
+ this.session = null;
+ this.connection = this.CreateConnection();
+
+ ReconnectSession();
+ this.connection.Start();
+ }
+
+ protected void ReconnectSession()
+ {
+ if(this.session != null)
+ {
+ this.session.Close();
+ }
+
+ this.session = this.connection.CreateSession(AcknowledgementMode.Transactional);
+
+ if(this.Topic == true)
+ {
+ this.destination = this.session.GetTopic(this.DestinationName);
+ }
+ else
+ {
+ this.destination = this.session.GetQueue(this.DestinationName);
+ }
+
+ this.producer = this.session.CreateProducer(destination);
+ this.consumer = CreateMessageConsumer();
+ }
+
+ protected void PurgeDestination()
+ {
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+ }
+
+ protected IMessageConsumer CreateMessageConsumer()
+ {
+ if(this.Subscription != null)
+ {
+ return this.session.CreateDurableConsumer((ITopic) destination, Subscription, null, false);
+ }
+ else
+ {
+ return this.session.CreateConsumer(destination);
+ }
+ }
+
+ protected void SetPrefetchToOne()
+ {
+ GetPrefetchPolicy().SetAll(1);
+ }
+
+ protected PrefetchPolicy GetPrefetchPolicy()
+ {
+ return ((Connection) connection).PrefetchPolicy;
+ }
+
+ [Test]
+ public void TestMessageListener()
+ {
+ // Send messages
+ for(int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i));
+ }
+
+ CommitTx();
+ consumer.Listener += new MessageListener(OnMessage);
+
+ // wait receive
+ WaitReceiveUnack();
+ Assert.AreEqual(unackMessages.Count, MESSAGE_COUNT);
+
+ // resend phase
+ WaitReceiveAck();
+ Assert.AreEqual(ackMessages.Count, MESSAGE_COUNT);
+
+ // should no longer re-receive
+ consumer.Listener -= new MessageListener(OnMessage);
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(500)));
+ Reconnect();
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ if(!resendPhase)
+ {
+ unackMessages.AddLast(message);
+ if(unackMessages.Count == MESSAGE_COUNT)
+ {
+ try
+ {
+ RollbackTx();
+ resendPhase = true;
+ }
+ catch
+ {
+ }
+ }
+ }
+ else
+ {
+ ackMessages.AddLast(message);
+ if(ackMessages.Count == MESSAGE_COUNT)
+ {
+ try
+ {
+ CommitTx();
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+
+ private void WaitReceiveUnack()
+ {
+ for(int i = 0; i < 100 && !resendPhase; i++)
+ {
+ Thread.Sleep(100);
+ }
+
+ Assert.IsTrue(resendPhase);
+ }
+
+ private void WaitReceiveAck()
+ {
+ for(int i = 0; i < 100 && ackMessages.Count < MESSAGE_COUNT; i++)
+ {
+ Thread.Sleep(100);
+ }
+
+ Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT);
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=896980&r1=896979&r2=896980&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Thu Jan 7 19:40:17 2010
@@ -91,5 +91,8 @@
<Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
<Compile Include="src\test\csharp\Util\MessageDispatchChannelTest.cs" />
<Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
+ <Compile Include="src\test\csharp\StompTopicTransactionTest.cs" />
+ <Compile Include="src\test\csharp\StompTransactionTestSupport.cs" />
+ <Compile Include="src\test\csharp\StompQueueTransactionTest.cs" />
</ItemGroup>
</Project>
\ No newline at end of file