You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/02 16:53:29 UTC

svn commit: r831931 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp: AMQQueueTransactionTest.cs AMQRedeliveryPolicyTest.cs AMQTopicTransactionTest.cs AMQTransactionTestSupport.cs MessageListenerRedeliveryTest.cs

Author: tabish
Date: Mon Nov  2 15:53:29 2009
New Revision: 831931

URL: http://svn.apache.org/viewvc?rev=831931&view=rev
Log:
Adding in several new Tests for Transactions and Redelivery

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs   (with props)

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs Mon Nov  2 15:53:29 2009
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]
+    public class AMQQueueTransactionTest : AMQTransactionTestSupport
+    {
+        public const String CLIENT_ID = "QueueTransactionTest";
+
+        protected override bool Topic
+        {
+            get { return false; }
+        }
+
+        protected override String ClientId
+        {
+            get { return CLIENT_ID; }
+        }
+
+        protected override String Subscription
+        {
+            get { return null; }
+        }
+
+        protected override String DestinationName
+        {
+            get { return "AMQQueueTransactionTestDestination"; }
+        }
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Mon Nov  2 15:53:29 2009
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Policies;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]    
+    public class AMQRedeliveryPolicyTest : NMSTestSupport
+    {
+        private const string DESTINATION_NAME = "RedeliveryPolicyTestDest";
+        
+        [Test]
+        public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.InitialRedeliveryDelay = 500;
+                policy.BackOffMultiplier = 2;
+                policy.UseExponentialBackOff = true;
+                policy.UseCollisionAvoidance = false;
+                
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+                IMessageProducer producer = session.CreateProducer(destination);
+                
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+        
+                // Send the messages
+                producer.Send(session.CreateTextMessage("1st"));
+                producer.Send(session.CreateTextMessage("2nd"));
+                session.Commit();
+                
+                ITextMessage m;
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+        
+                // No delay on first Rollback..
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNotNull(m);
+                session.Rollback();
+                
+                // Show subsequent re-delivery delay is incrementing.
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+                
+                // Show re-delivery delay is incrementing exponentially
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500));
+                Assert.IsNull(m);
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+            }            
+        }
+
+        [Test]
+        public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.InitialRedeliveryDelay = 500;
+                policy.UseExponentialBackOff = false;
+                
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+                
+                IMessageProducer producer = session.CreateProducer(destination);
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+        
+                // Send the messages
+                producer.Send(session.CreateTextMessage("1st"));
+                producer.Send(session.CreateTextMessage("2nd"));
+                session.Commit();
+                
+                ITextMessage m;
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+        
+                // No delay on first Rollback..
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNotNull(m);
+                session.Rollback();
+                
+                // Show subsequent re-delivery delay is incrementing.
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+                
+                // The message gets redelivered after 500 ms every time since
+                // we are not using exponential backoff.
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+            }
+        }
+    
+        [Test]
+        public void TestDLQHandling() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {            
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.InitialRedeliveryDelay = 100;
+                policy.UseExponentialBackOff = false;
+                policy.MaximumRedeliveries = 2;
+                
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+                IMessageProducer producer = session.CreateProducer(destination);
+                
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                IMessageConsumer dlqConsumer = session.CreateConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+        
+                // Send the messages
+                producer.Send(session.CreateTextMessage("1st"));
+                producer.Send(session.CreateTextMessage("2nd"));
+                session.Commit();
+                
+                ITextMessage m;
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+        
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+        
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+                
+                // The last Rollback should cause the 1st message to get sent to the DLQ 
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("2nd", m.Text);        
+                session.Commit();
+                
+                // We should be able to get the message off the DLQ now.
+                m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Commit();
+            }
+        }
+
+        [Test]
+        public void TestInfiniteMaximumNumberOfRedeliveries() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.InitialRedeliveryDelay = 100;
+                policy.UseExponentialBackOff = false;
+                // let's set the maximum redeliveries to no maximum (ie. infinite)
+                policy.MaximumRedeliveries = -1;
+                
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+                IMessageProducer producer = session.CreateProducer(destination);
+                
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                
+                // Send the messages
+                producer.Send(session.CreateTextMessage("1st"));
+                producer.Send(session.CreateTextMessage("2nd"));
+                session.Commit();
+                   
+                ITextMessage m;
+         
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+                
+                //we should be able to get the 1st message redelivered until a session.Commit is called
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();           
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();  
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();  
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();  
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Commit();  
+                
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("2nd", m.Text);        
+                session.Commit();
+            }
+        }
+
+        [Test]
+        public void TestZeroMaximumNumberOfRedeliveries() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.InitialRedeliveryDelay = 100;
+                policy.UseExponentialBackOff = false;
+                //let's set the maximum redeliveries to 0
+                policy.MaximumRedeliveries = 0;
+              
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+                IMessageProducer producer = session.CreateProducer(destination);
+                
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                
+                // Send the messages
+                producer.Send(session.CreateTextMessage("1st"));
+                producer.Send(session.CreateTextMessage("2nd"));
+                session.Commit();
+                
+                ITextMessage m;
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);        
+                session.Rollback();
+                
+                //the 1st  message should not be redelivered since maximumRedeliveries is set to 0
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("2nd", m.Text);        
+                session.Commit();
+            }
+        }    
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs Mon Nov  2 15:53:29 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]
+    public class AMQTopicTransactionTest : AMQTransactionTestSupport
+    {
+        public const String CLIENT_ID = "TopicTransactionTest";
+        public const String DESTINATION_NAME = "AMQTopicTransactionTestDestination";
+        public const String SUBSCRIPTION_NAME = "TopicTransactionTest";
+
+        protected override bool Topic
+        {
+            get { return true; }
+        }
+
+        protected override String ClientId
+        {
+            get { return CLIENT_ID; }
+        }
+
+        protected override String Subscription
+        {
+            get { return SUBSCRIPTION_NAME; }
+        }
+
+        protected override String DestinationName
+        {
+            get { return DESTINATION_NAME; }
+        }
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs Mon Nov  2 15:53:29 2009
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    public abstract class AMQTransactionTestSupport : NMSTestSupport
+    {
+        private const int MESSAGE_COUNT = 5;
+        private const string MESSAGE_TEXT = "message";
+
+        private IConnectionFactory connectionFactory;
+        private IConnection connection;
+        private ISession session;
+        private IMessageConsumer consumer;
+        private IMessageProducer producer;
+        private IDestination destination;
+        
+        private int batchCount = 10;
+        private int batchSize = 20;
+    
+        // for message listener test
+        private LinkedList<IMessage> unackMessages = new LinkedList<IMessage>();
+        private LinkedList<IMessage> ackMessages = new LinkedList<IMessage>();
+        private bool resendPhase;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.connectionFactory = new ConnectionFactory();
+            this.resendPhase = false;
+
+            Reconnect();
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            this.session.Close();
+            this.session = null;
+            this.connection.Close();
+            this.connection = null;
+
+            this.unackMessages.Clear();
+            this.ackMessages.Clear();
+            
+            base.TearDown();
+        }
+
+        protected abstract bool Topic
+        {
+            get;
+        }
+
+        protected abstract String ClientId
+        {
+            get;
+        }
+
+        protected abstract String Subscription
+        {
+            get;
+        }
+
+        protected abstract String DestinationName
+        {
+            get;
+        }
+        
+        public override IConnection CreateConnection()
+        {
+            return this.connectionFactory.CreateConnection();
+        }
+
+        protected void BeginTx()
+        {
+        }
+    
+        protected void CommitTx()
+        {
+            session.Commit();
+        }
+    
+        protected void RollbackTx()
+        {
+            session.Rollback();
+        }
+
+        [Test]
+        public void TestSendReceiveTransactedBatches() 
+        {            
+            ITextMessage message = session.CreateTextMessage("Batch IMessage");
+            
+            for(int j = 0; j < batchCount; j++) 
+            {    
+                BeginTx();
+                
+                for(int i = 0; i < batchSize; i++) 
+                {
+                    producer.Send(message);
+                }
+                
+                CommitTx();
+                
+                BeginTx();
+                for(int i = 0; i < batchSize; i++) 
+                {
+                    message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(5000));
+                    Assert.IsNotNull(message, "Received only " + i + " messages in batch " + j);
+                    Assert.AreEqual("Batch IMessage", message.Text);
+                }
+    
+                CommitTx();
+            }
+        }
+    
+        [Test]
+        public void TestSendRollback()
+        {                        
+            IMessage[] outbound = new IMessage[]
+            {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")};
+    
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+    
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            RollbackTx();
+    
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[1]);
+            CommitTx();
+    
+            // receives the first message
+            BeginTx();
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            messages.AddLast(message);
+    
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            messages.AddLast(message);
+    
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+    
+        [Test]
+        public void TestSendSessionClose() 
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage")};
+    
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+    
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            consumer.Close();
+    
+            ReconnectSession();
+    
+            // sends a message
+            producer.Send(outbound[1]);
+            CommitTx();
+    
+            // receives the first message
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+    
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestSendSessionAndConnectionClose() 
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage")};
+    
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[0]);
+            CommitTx();
+    
+            // sends a message that gets rollbacked
+            BeginTx();
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            consumer.Close();
+            session.Close();
+    
+            Reconnect();
+    
+            // sends a message
+            BeginTx();
+            producer.Send(outbound[1]);
+            CommitTx();
+    
+            // receives the first message
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+    
+            // receives the second message
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+    
+            // validates that the rollbacked was not consumed
+            CommitTx();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestReceiveRollback()
+        {
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null) 
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+            
+            // sent both messages
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+    
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            messages.AddLast(message);
+            AssertEquals(outbound[0], message);
+            CommitTx();
+    
+            // Rollback so we can get that last message again.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            AssertEquals(outbound[1], message);
+            RollbackTx();
+    
+            // Consume again.. the prev message should
+            // get redelivered.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the message again!");
+            messages.AddLast(message);
+            CommitTx();
+    
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+
+        [Test]
+        public void TestReceiveTwoThenRollback() 
+        {            
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null) 
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+            
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+        
+            LinkedList<IMessage> messages = new LinkedList<IMessage>();
+            BeginTx();
+            IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            AssertEquals(outbound[0], message);
+    
+            message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            AssertEquals(outbound[1], message);
+            RollbackTx();
+    
+            // Consume again.. the prev message should
+            // get redelivered.
+            BeginTx();
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the first message again!");
+            messages.AddLast(message);
+            AssertEquals(outbound[0], message);
+            message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+            Assert.IsNotNull(message, "Should have re-received the first message again!");
+            messages.AddLast(message);
+            AssertEquals(outbound[1], message);
+    
+            Assert.IsNull(consumer.ReceiveNoWait());
+            CommitTx();
+    
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound, 0);
+            AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+        }
+    
+        [Test]
+        public void TestSendReceiveWithPrefetchOne() {
+            SetPrefetchToOne();
+            ReconnectSession();
+
+            IMessage[] outbound = new IMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage"), 
+                session.CreateTextMessage("Third IMessage"),
+                session.CreateTextMessage("Fourth IMessage")};
+    
+            BeginTx();
+            for(int i = 0; i < outbound.Length; i++) 
+            {
+                // sends a message
+                producer.Send(outbound[i]);
+            }
+            CommitTx();
+    
+            // receives the first message
+            BeginTx();
+            
+            for(int i = 0; i < outbound.Length; i++) 
+            {
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(message);
+            }
+    
+            // validates that the rollbacked was not consumed
+            CommitTx();
+        }
+    
+        [Test]
+        public void TestReceiveTwoThenRollbackManyTimes() 
+        {
+            for(int i = 0; i < 5; i++)
+            {
+                TestReceiveTwoThenRollback();
+            }
+        }
+    
+        [Test]
+        public void TestSendRollbackWithPrefetchOfOne()
+        {
+            SetPrefetchToOne();
+            TestSendRollback();
+        }
+    
+        [Test]
+        public void TestReceiveRollbackWithPrefetchOfOne()
+        {
+            SetPrefetchToOne();
+            TestReceiveRollback();
+        }
+
+        [Test]
+        public void TestCloseConsumerBeforeCommit() 
+        {            
+            ITextMessage[] outbound = new ITextMessage[] {
+                session.CreateTextMessage("First IMessage"), 
+                session.CreateTextMessage("Second IMessage")};
+
+            // lets consume any outstanding messages from prev test runs
+            BeginTx();
+            bool needCommit = false;
+            while(consumer.ReceiveNoWait() != null) 
+            {
+                needCommit = true;
+            }
+
+            if(needCommit)
+            {
+                CommitTx();
+            }
+            
+            // sends the messages
+            BeginTx();
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            CommitTx();
+    
+            BeginTx();
+            ITextMessage message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.AreEqual(outbound[0].Text, message.Text);
+            // Close the consumer before the Commit. This should not cause the
+            // received message to Rollback.
+            consumer.Close();
+            CommitTx();
+    
+            // Create a new consumer
+            consumer = CreateMessageConsumer();
+    
+            BeginTx();
+            message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(message);
+            Assert.AreEqual(outbound[1].Text, message.Text);
+            CommitTx();
+        }
+    
+        protected void Reconnect()
+        {
+            if(this.connection != null)
+            {
+                // Close the prev connection.
+                this.connection.Close();
+                this.connection = null;
+            }
+            
+            this.session = null;
+            this.connection = this.connectionFactory.CreateConnection();
+
+            if(this.ClientId != null)
+            {
+                connection.ClientId = this.ClientId;
+            }
+            
+            ReconnectSession();
+            this.connection.Start();
+        }
+    
+        protected void ReconnectSession()
+        {
+            if(this.session != null) 
+            {
+                this.session.Close();
+            }
+    
+            this.session = this.connection.CreateSession(AcknowledgementMode.Transactional);
+
+            if( this.Topic == true )
+            {
+                this.destination = this.session.GetTopic(this.DestinationName);
+            }
+            else
+            {
+                this.destination = this.session.GetQueue(this.DestinationName);
+            }
+            
+            this.producer = this.session.CreateProducer(destination);
+            this.consumer = CreateMessageConsumer();
+        }
+        
+        protected IMessageConsumer CreateMessageConsumer()
+        {
+            if(this.Subscription != null)
+            {
+                return this.session.CreateDurableConsumer((ITopic) destination, Subscription, null, false);
+            }
+            else
+            {
+                return this.session.CreateConsumer(destination);
+            }
+        }
+
+        protected void SetPrefetchToOne() 
+        {
+            GetPrefetchPolicy().SetAll(1);
+        }
+    
+        protected PrefetchPolicy GetPrefetchPolicy() 
+        {
+            return ((Connection) connection).PrefetchPolicy;
+        }
+    
+        [Test]
+        public void TestMessageListener() 
+        {
+            // Send messages
+            for(int i = 0; i < MESSAGE_COUNT; i++) 
+            {
+                producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i));
+            }
+            
+            CommitTx();
+            consumer.Listener += new MessageListener(OnMessage);
+            
+            // wait receive
+            WaitReceiveUnack();
+            Assert.AreEqual(unackMessages.Count, MESSAGE_COUNT);
+            
+            // resend phase
+            WaitReceiveAck();
+            Assert.AreEqual(ackMessages.Count, MESSAGE_COUNT);
+            
+            // should no longer re-receive
+            consumer.Listener -= new MessageListener(OnMessage);
+            Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(500)));
+            Reconnect();
+        }
+    
+        public void OnMessage(IMessage message) 
+        {
+            if(!resendPhase) 
+            {
+                unackMessages.AddLast(message);
+                if(unackMessages.Count == MESSAGE_COUNT) 
+                {
+                    try 
+                    {
+                        RollbackTx();
+                        resendPhase = true;
+                    } 
+                    catch 
+                    {
+                    }
+                }
+            } 
+            else 
+            {
+                ackMessages.AddLast(message);
+                if(ackMessages.Count == MESSAGE_COUNT) 
+                {
+                    try 
+                    {
+                        CommitTx();
+                    } 
+                    catch 
+                    {
+                    }
+                }
+            }
+        }
+    
+        private void WaitReceiveUnack() 
+        {
+            for(int i = 0; i < 100 && !resendPhase; i++)
+            {
+                Thread.Sleep(100);
+            }
+            
+            Assert.IsTrue(resendPhase);
+        }
+    
+        private void WaitReceiveAck()
+        {
+            for(int i = 0; i < 100 && ackMessages.Count < MESSAGE_COUNT; i++)
+            {
+                Thread.Sleep(100);
+            }
+            
+            Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT);
+        }
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs Mon Nov  2 15:53:29 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Policies;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]
+    public class MessageListenerRedeliveryTest : NMSTestSupport
+    {
+        private Connection connection;
+        private int counter;
+        private ISession session;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            this.connection = (Connection) CreateConnection();
+            this.connection.RedeliveryPolicy = GetRedeliveryPolicy();
+
+            this.counter = 0;
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            this.session = null;
+
+            if(this.connection != null)
+            {
+                this.connection.Close();
+                this.connection = null;
+            }
+            
+            base.TearDown();
+        }
+    
+        protected IRedeliveryPolicy GetRedeliveryPolicy() 
+        {
+            RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+            redeliveryPolicy.InitialRedeliveryDelay = 1000;
+            redeliveryPolicy.MaximumRedeliveries = 3;
+            redeliveryPolicy.BackOffMultiplier = (short)2;
+            redeliveryPolicy.UseExponentialBackOff = true;
+            return redeliveryPolicy;
+        }
+    
+        private void OnMessageListener(IMessage message)
+        {        
+            counter++;
+            if(this.counter <= 4) 
+            {
+                session.Rollback();
+            }
+            else
+            {
+                message.Acknowledge();
+                session.Commit();
+            }
+        }
+
+        [Test]
+        public void TestQueueRollbackConsumerListener() 
+        {
+            connection.Start();
+    
+            this.session = connection.CreateSession(AcknowledgementMode.Transactional);
+            ITemporaryQueue queue = session.CreateTemporaryQueue();
+            IMessageProducer producer = session.CreateProducer(queue);
+            IMessage message = session.CreateTextMessage("Test Message");
+            producer.Send(message);
+            session.Commit();
+    
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+
+            consumer.Listener += new MessageListener(OnMessageListener);
+    
+            Thread.Sleep(500);
+
+            // first try.. should get 2 since there is no delay on the
+            // first redeliver..
+            Assert.AreEqual(2, counter);
+    
+            Thread.Sleep(1000);
+    
+            // 2nd redeliver (redelivery after 1 sec)
+            Assert.AreEqual(3, counter);
+    
+            Thread.Sleep(2000);
+    
+            // 3rd redeliver (redelivery after 2 seconds) - it should give up after
+            // that
+            Assert.AreEqual(4, counter);
+    
+            // create new message
+            producer.Send(session.CreateTextMessage("Test Message Again"));
+            session.Commit();
+    
+            Thread.Sleep(500);
+            
+            // it should be committed, so no redelivery
+            Assert.AreEqual(5, counter);
+    
+            Thread.Sleep(1500);
+
+            // no redelivery, counter should still be 5
+            Assert.AreEqual(5, counter);
+    
+            session.Close();
+        }
+       
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native