You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/02 16:53:29 UTC
svn commit: r831931 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp:
AMQQueueTransactionTest.cs AMQRedeliveryPolicyTest.cs
AMQTopicTransactionTest.cs AMQTransactionTestSupport.cs
MessageListenerRedeliveryTest.cs
Author: tabish
Date: Mon Nov 2 15:53:29 2009
New Revision: 831931
URL: http://svn.apache.org/viewvc?rev=831931&view=rev
Log:
Adding in several new Tests for Transactions and Redelivery
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs (with props)
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs Mon Nov 2 15:53:29 2009
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class AMQQueueTransactionTest : AMQTransactionTestSupport
+ {
+ public const String CLIENT_ID = "QueueTransactionTest";
+
+ protected override bool Topic
+ {
+ get { return false; }
+ }
+
+ protected override String ClientId
+ {
+ get { return CLIENT_ID; }
+ }
+
+ protected override String Subscription
+ {
+ get { return null; }
+ }
+
+ protected override String DestinationName
+ {
+ get { return "AMQQueueTransactionTestDestination"; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQQueueTransactionTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Mon Nov 2 15:53:29 2009
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.Policies;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class AMQRedeliveryPolicyTest : NMSTestSupport
+ {
+ private const string DESTINATION_NAME = "RedeliveryPolicyTestDest";
+
+ [Test]
+ public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 500;
+ policy.BackOffMultiplier = 2;
+ policy.UseExponentialBackOff = true;
+ policy.UseCollisionAvoidance = false;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ producer.Send(session.CreateTextMessage("1st"));
+ producer.Send(session.CreateTextMessage("2nd"));
+ session.Commit();
+
+ ITextMessage m;
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // No delay on first Rollback..
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNotNull(m);
+ session.Rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // Show re-delivery delay is incrementing exponentially
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500));
+ Assert.IsNull(m);
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ }
+ }
+
+ [Test]
+ public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 500;
+ policy.UseExponentialBackOff = false;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ producer.Send(session.CreateTextMessage("1st"));
+ producer.Send(session.CreateTextMessage("2nd"));
+ session.Commit();
+
+ ITextMessage m;
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // No delay on first Rollback..
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNotNull(m);
+ session.Rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // The message gets redelivered after 500 ms every time since
+ // we are not using exponential backoff.
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ }
+ }
+
+ [Test]
+ public void TestDLQHandling()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 100;
+ policy.UseExponentialBackOff = false;
+ policy.MaximumRedeliveries = 2;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageConsumer dlqConsumer = session.CreateConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
+
+ // Send the messages
+ producer.Send(session.CreateTextMessage("1st"));
+ producer.Send(session.CreateTextMessage("2nd"));
+ session.Commit();
+
+ ITextMessage m;
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // The last Rollback should cause the 1st message to get sent to the DLQ
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("2nd", m.Text);
+ session.Commit();
+
+ // We should be able to get the message off the DLQ now.
+ m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Commit();
+ }
+ }
+
+ [Test]
+ public void TestInfiniteMaximumNumberOfRedeliveries()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 100;
+ policy.UseExponentialBackOff = false;
+ // let's set the maximum redeliveries to no maximum (ie. infinite)
+ policy.MaximumRedeliveries = -1;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ producer.Send(session.CreateTextMessage("1st"));
+ producer.Send(session.CreateTextMessage("2nd"));
+ session.Commit();
+
+ ITextMessage m;
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ //we should be able to get the 1st message redelivered until a session.Commit is called
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Commit();
+
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("2nd", m.Text);
+ session.Commit();
+ }
+ }
+
+ [Test]
+ public void TestZeroMaximumNumberOfRedeliveries()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 100;
+ policy.UseExponentialBackOff = false;
+ //let's set the maximum redeliveries to 0
+ policy.MaximumRedeliveries = 0;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ producer.Send(session.CreateTextMessage("1st"));
+ producer.Send(session.CreateTextMessage("2nd"));
+ session.Commit();
+
+ ITextMessage m;
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ //the 1st message should not be redelivered since maximumRedeliveries is set to 0
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("2nd", m.Text);
+ session.Commit();
+ }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs Mon Nov 2 15:53:29 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class AMQTopicTransactionTest : AMQTransactionTestSupport
+ {
+ public const String CLIENT_ID = "TopicTransactionTest";
+ public const String DESTINATION_NAME = "AMQTopicTransactionTestDestination";
+ public const String SUBSCRIPTION_NAME = "TopicTransactionTest";
+
+ protected override bool Topic
+ {
+ get { return true; }
+ }
+
+ protected override String ClientId
+ {
+ get { return CLIENT_ID; }
+ }
+
+ protected override String Subscription
+ {
+ get { return SUBSCRIPTION_NAME; }
+ }
+
+ protected override String DestinationName
+ {
+ get { return DESTINATION_NAME; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTopicTransactionTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs Mon Nov 2 15:53:29 2009
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ public abstract class AMQTransactionTestSupport : NMSTestSupport
+ {
+ private const int MESSAGE_COUNT = 5;
+ private const string MESSAGE_TEXT = "message";
+
+ private IConnectionFactory connectionFactory;
+ private IConnection connection;
+ private ISession session;
+ private IMessageConsumer consumer;
+ private IMessageProducer producer;
+ private IDestination destination;
+
+ private int batchCount = 10;
+ private int batchSize = 20;
+
+ // for message listener test
+ private LinkedList<IMessage> unackMessages = new LinkedList<IMessage>();
+ private LinkedList<IMessage> ackMessages = new LinkedList<IMessage>();
+ private bool resendPhase;
+
+ [SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ this.connectionFactory = new ConnectionFactory();
+ this.resendPhase = false;
+
+ Reconnect();
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ this.session.Close();
+ this.session = null;
+ this.connection.Close();
+ this.connection = null;
+
+ this.unackMessages.Clear();
+ this.ackMessages.Clear();
+
+ base.TearDown();
+ }
+
+ protected abstract bool Topic
+ {
+ get;
+ }
+
+ protected abstract String ClientId
+ {
+ get;
+ }
+
+ protected abstract String Subscription
+ {
+ get;
+ }
+
+ protected abstract String DestinationName
+ {
+ get;
+ }
+
+ public override IConnection CreateConnection()
+ {
+ return this.connectionFactory.CreateConnection();
+ }
+
+ protected void BeginTx()
+ {
+ }
+
+ protected void CommitTx()
+ {
+ session.Commit();
+ }
+
+ protected void RollbackTx()
+ {
+ session.Rollback();
+ }
+
+ [Test]
+ public void TestSendReceiveTransactedBatches()
+ {
+ ITextMessage message = session.CreateTextMessage("Batch IMessage");
+
+ for(int j = 0; j < batchCount; j++)
+ {
+ BeginTx();
+
+ for(int i = 0; i < batchSize; i++)
+ {
+ producer.Send(message);
+ }
+
+ CommitTx();
+
+ BeginTx();
+ for(int i = 0; i < batchSize; i++)
+ {
+ message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Received only " + i + " messages in batch " + j);
+ Assert.AreEqual("Batch IMessage", message.Text);
+ }
+
+ CommitTx();
+ }
+ }
+
+ [Test]
+ public void TestSendRollback()
+ {
+ IMessage[] outbound = new IMessage[]
+ {session.CreateTextMessage("First IMessage"), session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ RollbackTx();
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ BeginTx();
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendSessionClose()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ consumer.Close();
+
+ ReconnectSession();
+
+ // sends a message
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendSessionAndConnectionClose()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[0]);
+ CommitTx();
+
+ // sends a message that gets rollbacked
+ BeginTx();
+ producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+ consumer.Close();
+ session.Close();
+
+ Reconnect();
+
+ // sends a message
+ BeginTx();
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ // receives the first message
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // receives the second message
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestReceiveRollback()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ // sent both messages
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ messages.AddLast(message);
+ AssertEquals(outbound[0], message);
+ CommitTx();
+
+ // Rollback so we can get that last message again.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ AssertEquals(outbound[1], message);
+ RollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the message again!");
+ messages.AddLast(message);
+ CommitTx();
+
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestReceiveTwoThenRollback()
+ {
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ LinkedList<IMessage> messages = new LinkedList<IMessage>();
+ BeginTx();
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ AssertEquals(outbound[0], message);
+
+ message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ AssertEquals(outbound[1], message);
+ RollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ BeginTx();
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the first message again!");
+ messages.AddLast(message);
+ AssertEquals(outbound[0], message);
+ message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(message, "Should have re-received the first message again!");
+ messages.AddLast(message);
+ AssertEquals(outbound[1], message);
+
+ Assert.IsNull(consumer.ReceiveNoWait());
+ CommitTx();
+
+ IMessage[] inbound = new IMessage[messages.Count];
+ messages.CopyTo(inbound, 0);
+ AssertTextMessagesEqual(outbound, inbound, "Rollback did not work.");
+ }
+
+ [Test]
+ public void TestSendReceiveWithPrefetchOne() {
+ SetPrefetchToOne();
+ ReconnectSession();
+
+ IMessage[] outbound = new IMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage"),
+ session.CreateTextMessage("Third IMessage"),
+ session.CreateTextMessage("Fourth IMessage")};
+
+ BeginTx();
+ for(int i = 0; i < outbound.Length; i++)
+ {
+ // sends a message
+ producer.Send(outbound[i]);
+ }
+ CommitTx();
+
+ // receives the first message
+ BeginTx();
+
+ for(int i = 0; i < outbound.Length; i++)
+ {
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ }
+
+ // validates that the rollbacked was not consumed
+ CommitTx();
+ }
+
+ [Test]
+ public void TestReceiveTwoThenRollbackManyTimes()
+ {
+ for(int i = 0; i < 5; i++)
+ {
+ TestReceiveTwoThenRollback();
+ }
+ }
+
+ [Test]
+ public void TestSendRollbackWithPrefetchOfOne()
+ {
+ SetPrefetchToOne();
+ TestSendRollback();
+ }
+
+ [Test]
+ public void TestReceiveRollbackWithPrefetchOfOne()
+ {
+ SetPrefetchToOne();
+ TestReceiveRollback();
+ }
+
+ [Test]
+ public void TestCloseConsumerBeforeCommit()
+ {
+ ITextMessage[] outbound = new ITextMessage[] {
+ session.CreateTextMessage("First IMessage"),
+ session.CreateTextMessage("Second IMessage")};
+
+ // lets consume any outstanding messages from prev test runs
+ BeginTx();
+ bool needCommit = false;
+ while(consumer.ReceiveNoWait() != null)
+ {
+ needCommit = true;
+ }
+
+ if(needCommit)
+ {
+ CommitTx();
+ }
+
+ // sends the messages
+ BeginTx();
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ CommitTx();
+
+ BeginTx();
+ ITextMessage message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.AreEqual(outbound[0].Text, message.Text);
+ // Close the consumer before the Commit. This should not cause the
+ // received message to Rollback.
+ consumer.Close();
+ CommitTx();
+
+ // Create a new consumer
+ consumer = CreateMessageConsumer();
+
+ BeginTx();
+ message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ Assert.AreEqual(outbound[1].Text, message.Text);
+ CommitTx();
+ }
+
+ protected void Reconnect()
+ {
+ if(this.connection != null)
+ {
+ // Close the prev connection.
+ this.connection.Close();
+ this.connection = null;
+ }
+
+ this.session = null;
+ this.connection = this.connectionFactory.CreateConnection();
+
+ if(this.ClientId != null)
+ {
+ connection.ClientId = this.ClientId;
+ }
+
+ ReconnectSession();
+ this.connection.Start();
+ }
+
+ protected void ReconnectSession()
+ {
+ if(this.session != null)
+ {
+ this.session.Close();
+ }
+
+ this.session = this.connection.CreateSession(AcknowledgementMode.Transactional);
+
+ if( this.Topic == true )
+ {
+ this.destination = this.session.GetTopic(this.DestinationName);
+ }
+ else
+ {
+ this.destination = this.session.GetQueue(this.DestinationName);
+ }
+
+ this.producer = this.session.CreateProducer(destination);
+ this.consumer = CreateMessageConsumer();
+ }
+
+ protected IMessageConsumer CreateMessageConsumer()
+ {
+ if(this.Subscription != null)
+ {
+ return this.session.CreateDurableConsumer((ITopic) destination, Subscription, null, false);
+ }
+ else
+ {
+ return this.session.CreateConsumer(destination);
+ }
+ }
+
+ protected void SetPrefetchToOne()
+ {
+ GetPrefetchPolicy().SetAll(1);
+ }
+
+ protected PrefetchPolicy GetPrefetchPolicy()
+ {
+ return ((Connection) connection).PrefetchPolicy;
+ }
+
+ [Test]
+ public void TestMessageListener()
+ {
+ // Send messages
+ for(int i = 0; i < MESSAGE_COUNT; i++)
+ {
+ producer.Send(session.CreateTextMessage(MESSAGE_TEXT + i));
+ }
+
+ CommitTx();
+ consumer.Listener += new MessageListener(OnMessage);
+
+ // wait receive
+ WaitReceiveUnack();
+ Assert.AreEqual(unackMessages.Count, MESSAGE_COUNT);
+
+ // resend phase
+ WaitReceiveAck();
+ Assert.AreEqual(ackMessages.Count, MESSAGE_COUNT);
+
+ // should no longer re-receive
+ consumer.Listener -= new MessageListener(OnMessage);
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(500)));
+ Reconnect();
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ if(!resendPhase)
+ {
+ unackMessages.AddLast(message);
+ if(unackMessages.Count == MESSAGE_COUNT)
+ {
+ try
+ {
+ RollbackTx();
+ resendPhase = true;
+ }
+ catch
+ {
+ }
+ }
+ }
+ else
+ {
+ ackMessages.AddLast(message);
+ if(ackMessages.Count == MESSAGE_COUNT)
+ {
+ try
+ {
+ CommitTx();
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+
+ private void WaitReceiveUnack()
+ {
+ for(int i = 0; i < 100 && !resendPhase; i++)
+ {
+ Thread.Sleep(100);
+ }
+
+ Assert.IsTrue(resendPhase);
+ }
+
+ private void WaitReceiveAck()
+ {
+ for(int i = 0; i < 100 && ackMessages.Count < MESSAGE_COUNT; i++)
+ {
+ Thread.Sleep(100);
+ }
+
+ Assert.IsFalse(ackMessages.Count < MESSAGE_COUNT);
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQTransactionTestSupport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs?rev=831931&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs Mon Nov 2 15:53:29 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Policies;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class MessageListenerRedeliveryTest : NMSTestSupport
+ {
+ private Connection connection;
+ private int counter;
+ private ISession session;
+
+ [SetUp]
+ public override void SetUp()
+ {
+ this.connection = (Connection) CreateConnection();
+ this.connection.RedeliveryPolicy = GetRedeliveryPolicy();
+
+ this.counter = 0;
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ this.session = null;
+
+ if(this.connection != null)
+ {
+ this.connection.Close();
+ this.connection = null;
+ }
+
+ base.TearDown();
+ }
+
+ protected IRedeliveryPolicy GetRedeliveryPolicy()
+ {
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.InitialRedeliveryDelay = 1000;
+ redeliveryPolicy.MaximumRedeliveries = 3;
+ redeliveryPolicy.BackOffMultiplier = (short)2;
+ redeliveryPolicy.UseExponentialBackOff = true;
+ return redeliveryPolicy;
+ }
+
+ private void OnMessageListener(IMessage message)
+ {
+ counter++;
+ if(this.counter <= 4)
+ {
+ session.Rollback();
+ }
+ else
+ {
+ message.Acknowledge();
+ session.Commit();
+ }
+ }
+
+ [Test]
+ public void TestQueueRollbackConsumerListener()
+ {
+ connection.Start();
+
+ this.session = connection.CreateSession(AcknowledgementMode.Transactional);
+ ITemporaryQueue queue = session.CreateTemporaryQueue();
+ IMessageProducer producer = session.CreateProducer(queue);
+ IMessage message = session.CreateTextMessage("Test Message");
+ producer.Send(message);
+ session.Commit();
+
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ consumer.Listener += new MessageListener(OnMessageListener);
+
+ Thread.Sleep(500);
+
+ // first try.. should get 2 since there is no delay on the
+ // first redeliver..
+ Assert.AreEqual(2, counter);
+
+ Thread.Sleep(1000);
+
+ // 2nd redeliver (redelivery after 1 sec)
+ Assert.AreEqual(3, counter);
+
+ Thread.Sleep(2000);
+
+ // 3rd redeliver (redelivery after 2 seconds) - it should give up after
+ // that
+ Assert.AreEqual(4, counter);
+
+ // create new message
+ producer.Send(session.CreateTextMessage("Test Message Again"));
+ session.Commit();
+
+ Thread.Sleep(500);
+
+ // it should be committed, so no redelivery
+ Assert.AreEqual(5, counter);
+
+ Thread.Sleep(1500);
+
+ // no redelivery, counter should still be 5
+ Assert.AreEqual(5, counter);
+
+ session.Close();
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageListenerRedeliveryTest.cs
------------------------------------------------------------------------------
svn:eol-style = native