You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/07 20:40:17 UTC

svn commit: r896980 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: ./ src/test/csharp/

Author: tabish
Date: Thu Jan  7 19:40:17 2010
New Revision: 896980

URL: http://svn.apache.org/viewvc?rev=896980&view=rev
Log:
Update the IndividualAckTest and add more Transaction Tests

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs?rev=896980&r1=896979&r2=896980&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/IndividualAckTest.cs Thu Jan  7 19:40:17 2010
@@ -73,7 +73,7 @@
             session.Close();
         }
 
-        //[Test]
+        [Test]
         public void TestLastMessageAcked()
         {
             ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
@@ -88,11 +88,11 @@
 
             // Consume the message...
             IMessageConsumer consumer = session.CreateConsumer(queue);
-            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNotNull(msg);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNotNull(msg);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNotNull(msg);
             msg.Acknowledge();
 
@@ -102,13 +102,13 @@
 
             // Attempt to Consume the message...
             consumer = session.CreateConsumer(queue);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNotNull(msg);
-            Assert.AreEqual(msg1,msg);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.AreEqual(msg1.Text, msg.Text);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNotNull(msg);
-            Assert.AreEqual(msg2,msg);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.AreEqual(msg2.Text, msg.Text);
+            msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
             Assert.IsNull(msg);
             session.Close();
         }

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs Thu Jan  7 19:40:17 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+    [TestFixture]
+    public class StompQueueTransactionTest : StompTransactionTestSupport
+    {
+        public const String CLIENT_ID = "QueueTransactionTest";
+        public const String DESTINATION_NAME = "QueueTransactionTestDestination";
+
+        protected override bool Topic
+        {
+            get { return false; }
+        }
+
+        protected override String TestClientId
+        {
+            get { return CLIENT_ID + ":" + new Random().Next(); }
+        }
+
+        protected override String Subscription
+        {
+            get { return null; }
+        }
+
+        protected override String DestinationName
+        {
+            get { return DESTINATION_NAME; }
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompQueueTransactionTest.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs Thu Jan  7 19:40:17 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+    [TestFixture]
+    public class StompTopicTransactionTest : StompTransactionTestSupport
+    {
+        public const String CLIENT_ID = "TopicTransactionTest";
+        public const String DESTINATION_NAME = "StompTopicTransactionTestDestination";
+        public const String SUBSCRIPTION_NAME = "TopicTransactionTest";
+
+        protected override bool Topic
+        {
+            get { return true; }
+        }
+
+        protected override String TestClientId
+        {
+            get { return CLIENT_ID; }
+        }
+
+        protected override String Subscription
+        {
+            get { return SUBSCRIPTION_NAME; }
+        }
+
+        protected override String DestinationName
+        {
+            get { return DESTINATION_NAME; }
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTopicTransactionTest.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs?rev=896980&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs Thu Jan  7 19:40:17 2010
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Stomp;
+using Apache.NMS.Stomp.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+    public abstract class StompTransactionTestSupport : NMSTestSupport
+    {
+        private const int MESSAGE_COUNT = 5;
+        private const string MESSAGE_TEXT = "message";
+
+        private IConnection connection;
+        private ISession session;
+        private IMessageConsumer consumer;
+        private IMessageProducer producer;
+        private IDestination destination;
+
+        private int batchCount = 10;
+        private int batchSize = 20;
+
+        // for message listener test
+        private LinkedList<IMessage> unackMessages = new LinkedList<IMessage>();
+        private LinkedList<IMessage> ackMessages = new LinkedList<IMessage>();
+        private bool resendPhase;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.resendPhase = false;
+
+            Reconnect();
+            PurgeDestination();
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            this.session.Close();
+            this.session = null;
+            this.connection.Close();
+            this.connection = null;
+
+            this.unackMessages.Clear();
+            this.ackMessages.Clear();
+
+            base.TearDown();
+
+            Thread.Sleep(500);
+        }
+
+        protected abstract bool Topic
+        {
+            get;
+        }
+
+        protected abstract String TestClientId
+        {
+            get;
+        }
+
+        protected abstract String Subscription
+        {
+            get;
+        }
+
+        protected abstract String DestinationName
+        {
+            get;
+        }
+
+        public override IConnection CreateConnection()
+        {
+            return CreateConnection(this.TestClientId);
+        }
+
+        protected void BeginTx()
+        {
+        }
+
+        protected void CommitTx()
+        {
+            session.Commit();
+        }
+
+        protected void RollbackTx()
+        {
+            session.Rollback();
+        }
+
+        [Test]
+        public void TestSendReceiveTransactedBatches()
+        {
+            ITextMessage message = session.CreateTextMessage("Batch IMessage");
+
+            for(int j = 0; j < batchCount; j++)
+            {
+                BeginTx();
+
+                for(int i = 0; i < batchSize; i++)
+                {
+                    producer.Send(message);
+                }
+
+                CommitTx();
+
+                BeginTx();
+                for(int i = 0; i < batchSize; i++)
+                {
+                    message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(5000));
+                    Assert.IsNotNull(message, "Received only " + i + " messages in batch " + j);
+                    Assert.AreEqual("Batch IMessage", message.Text);
+                }
+
+                CommitTx();
+            }
+        }
+
+        [Test]
+        public void TestSendRollback()
+        {
+            IMessage[] outbound = new IMessage[]
+            {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")};
+
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            RollbackTx();
+
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            // receives the first message
+            BeginTx();
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            messages.AddLast(message);
+
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            messages.AddLast(message);
+
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestSendSessionClose()
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage")};
+
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            consumer.Close();
+
+            ReconnectSession();
+
+            // sends a message
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            // receives the first message
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestSendSessionAndConnectionClose()
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage")};
+
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            consumer.Close();
+            session.Close();
+
+            Reconnect();
+
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            // receives the first message
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestReceiveRollback()
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null)
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+
+            // sent both messages
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+            AssertEquals(outbound[0], message);
+            CommitTx();
+
+            // Rollback so we can get that last message again.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            AssertEquals(outbound[1], message);
+            RollbackTx();
+
+            // Consume again.. the prev message should
+            // get redelivered.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the message again!");
+            messages.AddLast(message);
+            CommitTx();
+
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestReceiveTwoThenRollback()
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null)
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            AssertEquals(outbound[0], message);
+
+            message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            AssertEquals(outbound[1], message);
+            RollbackTx();
+
+            // Consume again.. the prev message should
+            // get redelivered.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the first message again!");
+            messages.AddLast(message);
+            AssertEquals(outbound[0], message);
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the first message again!");
+            messages.AddLast(message);
+            AssertEquals(outbound[1], message);
+
+            Assert.IsNull(consumer.ReceiveNoWait());
+            CommitTx();
+
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestSendReceiveWithPrefetchOne() {
+            SetPrefetchToOne();
+            ReconnectSession();
+
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage"),
+                session.CreateTextMessage("Third IMessage"),
+                session.CreateTextMessage("Fourth IMessage")};
+
+            BeginTx();
+            for(int i = 0; i < outbound.Length; i++)
+            {
+                // sends a message
+                producer.Send(outbound[i]);
+            }
+            CommitTx();
+
+            // receives the first message
+            BeginTx();
+
+            for(int i = 0; i < outbound.Length; i++)
+            {
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(message);
+            }
+
+            // validates that the rollbacked was not consumed
+            CommitTx();
+        }
+
+        [Test]
+        public void TestReceiveTwoThenRollbackManyTimes()
+        {
+            for(int i = 0; i < 5; i++)
+            {
+                TestReceiveTwoThenRollback();
+            }
+        }
+
+        [Test]
+        public void TestSendRollbackWithPrefetchOfOne()
+        {
+            SetPrefetchToOne();
+            TestSendRollback();
+        }
+
+        [Test]
+        public void TestReceiveRollbackWithPrefetchOfOne()
+        {
+            SetPrefetchToOne();
+            TestReceiveRollback();
+        }
+
+        [Test]
+        public void TestCloseConsumerBeforeCommit()
+        {
+            ITextMessage[] outbound = new ITextMessage[] {
+                session.CreateTextMessage("First IMessage"),
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null)
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+
+            // sends the messages
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+
+            BeginTx();
+            ITextMessage message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.AreEqual(outbound[0].Text, message.Text);
+            // Close the consumer before the Commit. This should not cause the
+            // received message to Rollback.
+            consumer.Close();
+            CommitTx();
+
+            // Create a new consumer
+            consumer = CreateMessageConsumer();
+
+            BeginTx();
+            message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            Assert.AreEqual(outbound[1].Text, message.Text);
+            CommitTx();
+        }
+
+        protected void Reconnect()
+        {
+            if(this.connection != null)
+            {
+                // Close the prev connection.
+                this.connection.Close();
+                this.connection = null;
+            }
+
+            Thread.Sleep(500);
+
+            this.session = null;
+            this.connection = this.CreateConnection();
+
+            ReconnectSession();
+            this.connection.Start();
+        }
+
+        protected void ReconnectSession()
+        {
+            if(this.session != null)
+            {
+                this.session.Close();
+            }
+
+            this.session = this.connection.CreateSession(AcknowledgementMode.Transactional);
+
+            if(this.Topic == true)
+            {
+                this.destination = this.session.GetTopic(this.DestinationName);
+            }
+            else
+            {
+                this.destination = this.session.GetQueue(this.DestinationName);
+            }
+
+            this.producer = this.session.CreateProducer(destination);
+            this.consumer = CreateMessageConsumer();
+        }
+
+        protected void PurgeDestination()
+        {
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null)
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+        }
+
+        protected IMessageConsumer CreateMessageConsumer()
+        {
+            if(this.Subscription != null)
+            {
+                return this.session.CreateDurableConsumer((ITopic) destination, Subscription, null, false);
+            }
+            else
+            {
+                return this.session.CreateConsumer(destination);
+            }
+        }
+
+        protected void SetPrefetchToOne()
+        {
+            GetPrefetchPolicy().SetAll(1);
+        }
+
+        protected PrefetchPolicy GetPrefetchPolicy()
+        {
+            return ((Connection) connection).PrefetchPolicy;
+        }
+
+        [Test]
+        public void TestMessageListener()
+        {
+            // Send messages
+            for(int i = 0; i < MESSAGE_COUNT; i++)
+            {
+                producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i));
+            }
+
+            CommitTx();
+            consumer.Listener += new MessageListener(OnMessage);
+
+            // wait receive
+            WaitReceiveUnack();
+            Assert.AreEqual(unackMessages.Count, MESSAGE_COUNT);
+
+            // resend phase
+            WaitReceiveAck();
+            Assert.AreEqual(ackMessages.Count, MESSAGE_COUNT);
+
+            // should no longer re-receive
+            consumer.Listener -= new MessageListener(OnMessage);
+            Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(500)));
+            Reconnect();
+        }
+
+        public void OnMessage(IMessage message)
+        {
+            if(!resendPhase)
+            {
+                unackMessages.AddLast(message);
+                if(unackMessages.Count == MESSAGE_COUNT)
+                {
+                    try
+                    {
+                        RollbackTx();
+                        resendPhase = true;
+                    }
+                    catch
+                    {
+                    }
+                }
+            }
+            else
+            {
+                ackMessages.AddLast(message);
+                if(ackMessages.Count == MESSAGE_COUNT)
+                {
+                    try
+                    {
+                        CommitTx();
+                    }
+                    catch
+                    {
+                    }
+                }
+            }
+        }
+
+        private void WaitReceiveUnack()
+        {
+            for(int i = 0; i < 100 && !resendPhase; i++)
+            {
+                Thread.Sleep(100);
+            }
+
+            Assert.IsTrue(resendPhase);
+        }
+
+        private void WaitReceiveAck()
+        {
+            for(int i = 0; i < 100 && ackMessages.Count < MESSAGE_COUNT; i++)
+            {
+                Thread.Sleep(100);
+            }
+
+            Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT);
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/StompTransactionTestSupport.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=896980&r1=896979&r2=896980&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Thu Jan  7 19:40:17 2010
@@ -91,5 +91,8 @@
     <Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
     <Compile Include="src\test\csharp\Util\MessageDispatchChannelTest.cs" />
     <Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
+    <Compile Include="src\test\csharp\StompTopicTransactionTest.cs" />
+    <Compile Include="src\test\csharp\StompTransactionTestSupport.cs" />
+    <Compile Include="src\test\csharp\StompQueueTransactionTest.cs" />
   </ItemGroup>
 </Project>
\ No newline at end of file