You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/02/22 18:00:47 UTC

svn commit: r1073408 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp: DtcBasicTransactionsTest.cs DtcConsumerTransactionsTest.cs DtcProducerTransactionsTest.cs DtcTransactionsTestSupport.cs

Author: tabish
Date: Tue Feb 22 17:00:46 2011
New Revision: 1073408

URL: http://svn.apache.org/viewvc?rev=1073408&view=rev
Log:
Adds a suite of tests for verifying the functionality of MSDTC based transaction support.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcBasicTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        [ExpectedException("Apache.NMS.NMSException")]
+        public void TestSessionCreateFailsWithInvalidLogLocation()
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                NetTxConnection con = connection as NetTxConnection;
+                NetTxRecoveryPolicy policy = con.RecoveryPolicy;
+                (policy.RecoveryLogger as RecoveryFileLogger).Location = nonExistantPath;
+                connection.CreateNetTxSession();
+            }
+        }
+
+        [Test]
+        public void TestTransactedDBReadAndProduce()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestTransacteDequeueAndDbWrite()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction and stored all messages
+            VerifyDatabaseTableIsFull();
+
+            // check no messages are present in the queue after commit.
+            VerifyNoMessagesInQueueNoRecovery();
+        }
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcConsumerTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        public void TestRecoveryAfterCommitFailsBeforeSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoveryAfterCommitFailsAfterSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestIterativeTransactedConsume()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue(5 * MSG_COUNT);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull(5 * MSG_COUNT);
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueueNoRecovery();
+        }
+
+        [Test]
+        public void TestConsumeWithDBInsertLogLocation()
+        {
+            const string logLocation = @".\RecoveryDir";
+            const string newConnectionUri =
+                connectionURI + "?nms.RecoveryPolicy.RecoveryLogger.Location=" + logLocation;
+
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            if (Directory.Exists(logLocation))
+            {
+                Directory.Delete(logLocation, true);
+            }
+
+            Directory.CreateDirectory(logLocation);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            Assert.AreEqual(1, Directory.GetFiles(logLocation).Length);
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are NOT present in the queue
+            VerifyBrokerQueueCount(0, newConnectionUri);
+
+            Assert.AreEqual(0, Directory.GetFiles(logLocation).Length);
+        }
+
+        [Test]
+        public void TestRecoverAfterTransactionScopeAborted()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterRollbackFailWhenScopeAborted()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are recovered and present in the queue 
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // Messages are visible since no prepare sent
+            VerifyBrokerQueueCountNoRecovery();
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionAfterPrepareSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // not visible yet because it must be rolled back
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcProducerTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        public void TestRecoverAfterFailOnTransactionCommit()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionPostCommitSend()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should have been commited
+            VerifyBrokerQueueCountNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestNoRecoverAfterFailOnTransactionWhenLogDeleted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            NetTxConnectionFactory netTxFactory = factory as NetTxConnectionFactory;
+            RecoveryFileLogger logger = netTxFactory.RecoveryPolicy.RecoveryLogger as RecoveryFileLogger;
+            string logDirectory = logger.Location;
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // delete all recovery files            
+            foreach (string file in Directory.GetFiles(logDirectory, "*.bin"))
+            {
+                File.Delete(file);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are NOT present in the queue bacause recovery file has been deleted
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestNoRecoverAfterFailOnTransactionWhenLogWriteFails()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            const string newConnectionUri = 
+                connectionURI + "?nms.RecoveryPolicy.RecoveryLoggerType=harness";
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                IRecoveryLogger logger = (connection as NetTxConnection).RecoveryPolicy.RecoveryLogger;
+                Assert.IsNotNull(logger);
+                RecoveryLoggerHarness harness = logger as RecoveryLoggerHarness;
+                Assert.IsNotNull(harness);
+
+                harness.PreLogRecoveryInfoEvent += FailOnPreLogRecoveryHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has not commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has not commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionDuringPrepareSend()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterTransactionScopeAborted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterRollbackFailWhenScopeAborted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+                ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // before recovering, messages should NOT be present in the queue
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // check messages are not present in the queue after recover
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestIterativeTransactedProduceWithDBDelete()
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount(MSG_COUNT * 5);
+        }
+
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Transactions;
+using System.Threading;
+
+using NUnit.Framework;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using System.Data.SqlClient;
+using System.Collections;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    // PREREQUISITES to run those tests :
+    // - A local instance of sql server 2008 running, with a db (e.g. TestDB) that 
+    //   as a table (e.g. TestTable) with a single column (e.g. TestID) of type INT.
+    //   The test default to using an SQL Connection string with a user id of 
+    //   'user' and the password 'password'
+    // - AMQ Server 5.4.2+
+    // - NMS 1.5+
+    //
+    // IMPORTANT
+    // Because you cannot perform recovery in a process more than once you cannot
+    // run these tests sequentially in the NUnit GUI or NUnit Console runner you
+    // must run them one at a time from the console or using a tool like the ReSharper
+    // plugin for Visual Studio.
+    //
+
+    public class DtcTransactionsTestSupport : NMSTestSupport
+    {
+        protected const int MSG_COUNT = 5;
+        protected string nonExistantPath;
+        
+        private ITrace oldTracer;
+
+        protected const string sqlConnectionString =
+            "Data Source=localhost;Initial Catalog=TestDB;User ID=user;Password=password";
+        protected const string testTable = "TestTable";
+        protected const string testColumn = "TestID";
+        protected const string testQueueName = "TestQueue";
+        protected const string connectionURI = "tcpfaulty://${activemqhost}:61616";
+
+        [SetUp]
+        public override void SetUp()
+        {
+            this.oldTracer = Tracer.Trace;
+            this.nonExistantPath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + Guid.NewGuid();
+
+            base.SetUp();
+
+            PurgeDestination();
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            DeleteDestination();
+            
+            base.TearDown();
+
+            Tracer.Trace = this.oldTracer;
+        }
+
+        protected void OnException(Exception ex)
+        {
+            Tracer.Debug("Test Driver received Error Notification: " + ex.Message);
+        }
+
+        #region Database Utility Methods
+
+        protected static void PrepareDatabase()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                // remove all data from test table
+                using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+                {
+                    sqlCommand.ExecuteNonQuery();
+                }
+
+                // add some data to test table
+                for (int i = 0; i < MSG_COUNT; ++i)
+                {
+                    using (SqlCommand sqlCommand = new SqlCommand(
+                        string.Format(
+                                        "INSERT INTO {0} ({1}) values ({2})",
+                                        testTable,
+                                        testColumn,
+                                        i), sqlConnection))
+                    {
+                        sqlCommand.ExecuteNonQuery();
+                    }
+                }
+
+                sqlConnection.Close();
+            }
+        }
+
+        protected static void PurgeDatabase()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                // remove all data from test table
+                using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+                {
+                    sqlCommand.ExecuteNonQuery();
+                }
+
+                sqlConnection.Close();
+            }
+        }
+
+        protected static IList ExtractDataSet()
+        {
+            IList entries = new ArrayList();
+
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                using (SqlCommand sqlReadCommand = new SqlCommand(
+                    string.Format("SELECT {0} FROM {1}", testColumn, testTable), sqlConnection))
+                using (SqlDataReader reader = sqlReadCommand.ExecuteReader())
+                {
+                    while (reader.Read())
+                    {
+                        entries.Add("Hello World " + (int)reader[0]);
+                    }
+                }
+            }
+
+            return entries;
+        }
+
+        protected static void VerifyDatabaseTableIsEmpty()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+                SqlCommand sqlCommand = new SqlCommand(
+                    string.Format("SELECT COUNT(*) FROM {0}", testTable),
+                    sqlConnection);
+                int count = (int)sqlCommand.ExecuteScalar();
+                Assert.AreEqual(0, count, "wrong number of rows in DB");
+            }
+        }
+
+        protected static void VerifyDatabaseTableIsFull()
+        {
+            VerifyDatabaseTableIsFull(MSG_COUNT);
+        }
+
+        protected static void VerifyDatabaseTableIsFull(int expected)
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+                SqlCommand sqlCommand = new SqlCommand(
+                    string.Format("SELECT COUNT(*) FROM {0}", testTable),
+                    sqlConnection);
+                int count = (int)sqlCommand.ExecuteScalar();
+                Assert.AreEqual(expected, count, "wrong number of rows in DB");
+            }
+        }
+
+        #endregion
+
+        #region Destination Utility Methods
+
+        protected static void DeleteDestination()
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (Connection connection = factory.CreateConnection() as Connection)
+            {
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+                    try
+                    {
+                        connection.DeleteDestination(queue);
+                    }
+                    catch
+                    {
+                    }
+                }
+            }
+        }
+
+        protected static void PurgeDestination()
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(testQueueName)))
+                {
+                    IMessage recvd = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                    while (recvd != null)
+                    {
+                        Tracer.Debug("Setup Purged Message: " + recvd);
+                    }
+                }
+            }
+        }
+
+        protected static void PurgeAndFillQueue()
+        {
+            PurgeAndFillQueue(MSG_COUNT);
+        }
+
+        protected static void PurgeAndFillQueue(int msgCount)
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    // empty queue
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        while ((consumer.Receive(TimeSpan.FromMilliseconds(2000))) != null)
+                        {
+                        }
+                    }
+
+                    // enqueue several messages
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        for (int i = 0; i < msgCount; i++)
+                        {
+                            producer.Send(session.CreateTextMessage(i.ToString()));
+                        }
+                    }
+                }
+            }
+        }
+
+        #endregion
+
+        #region Broker Queue State Validation Routines
+
+        protected static void VerifyBrokerQueueCountNoRecovery()
+        {
+            VerifyBrokerQueueCountNoRecovery(MSG_COUNT);
+        }
+
+        protected static void VerifyBrokerQueueCountNoRecovery(int expectedNumberOfMessages)
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                // check messages are present in the queue
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    using (IQueueBrowser browser = session.CreateBrowser(queue))
+                    {
+                        connection.Start();
+                        int count = 0;
+                        IEnumerator enumerator = browser.GetEnumerator();
+
+                        while(enumerator.MoveNext())
+                        {
+                            IMessage msg = enumerator.Current as IMessage;
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                            count++;
+                        }
+
+                        // count should match the expected count
+                        Assert.AreEqual(expectedNumberOfMessages, count);
+                    }
+                }
+            }
+        }
+
+        protected static void VerifyBrokerQueueCount()
+        {
+            VerifyBrokerQueueCount(MSG_COUNT, connectionURI);
+        }
+
+        protected static void VerifyBrokerQueueCount(int expectedCount)
+        {
+            VerifyBrokerQueueCount(expectedCount, connectionURI);
+        }
+
+        protected static void VerifyBrokerQueueCount(string connectionUri)
+        {
+            VerifyBrokerQueueCount(MSG_COUNT, connectionUri);
+        }
+
+        protected static void VerifyBrokerQueueCount(int expectedCount, string connectionUri)
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionUri));
+            
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                // check messages are present in the queue
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    using (IQueueBrowser browser = session.CreateBrowser(queue))
+                    {
+                        connection.Start();
+                        int count = 0;
+                        IEnumerator enumerator = browser.GetEnumerator();
+
+                        while (enumerator.MoveNext())
+                        {
+                            IMessage msg = enumerator.Current as IMessage;
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                            count++;
+                        }
+
+                        // count should match the expected count
+                        Assert.AreEqual(expectedCount, count);
+                    }
+                }
+            }
+        }
+
+        protected static void VerifyNoMessagesInQueueNoRecovery()
+        {
+            VerifyBrokerQueueCountNoRecovery(0);
+        }
+
+        protected static void VerifyNoMessagesInQueue()
+        {
+            VerifyBrokerQueueCount(0);
+        }
+
+        #endregion
+
+        #region Transport Hools for controlling failure point.
+
+        public void FailOnPrepareTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.Prepare)
+                {
+                    Thread.Sleep(1000);
+                    Tracer.Debug("Throwing Error on Prepare.");
+                    throw new Exception("Error writing Prepare command");
+                }
+            }
+        }
+
+        public void FailOnRollbackTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.Rollback)
+                {
+                    Tracer.Debug("Throwing Error on Rollback.");
+                    throw new Exception("Error writing Rollback command");
+                }
+            }
+        }
+
+        public void FailOnCommitTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.CommitTwoPhase)
+                {
+                    Tracer.Debug("Throwing Error on Commit.");
+                    throw new Exception("Error writing Commit command");
+                }
+            }
+        }
+
+        #endregion
+
+        #region Recovery Harness Hooks for controlling failure conditions
+
+        public void FailOnPreLogRecoveryHook(XATransactionId xid, byte[] recoveryInformatio)
+        {
+            Tracer.Debug("Throwing Error before the Recovery Information is Logged.");
+            throw new Exception("Intentional Error Logging Recovery Information");
+        }
+
+        #endregion
+
+        #region Produce Messages use cases
+
+        protected static void ReadFromDbAndProduceToQueueWithCommit(INetTxConnection connection)
+        {
+            IList entries = ExtractDataSet();
+
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // enqueue several messages read from DB
+                try
+                {
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        {
+                            sqlConnection.Open();
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+                            foreach (string textBody in entries)
+                            {
+                                producer.Send(session.CreateTextMessage(textBody));
+                            }
+
+                            using (SqlCommand sqlDeleteCommand = new SqlCommand(
+                                string.Format("DELETE FROM {0}", testTable), sqlConnection))
+                            {
+                                int count = sqlDeleteCommand.ExecuteNonQuery();
+                                Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+                            }
+
+                            scoped.Complete();
+                        }
+                    }
+                }
+                catch (Exception e) // exception thrown in TransactionContext.Commit(Enlistment enlistment)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        protected static void ReadFromDbAndProduceToQueueWithScopeAborted(INetTxConnection connection)
+        {
+            IList entries = ExtractDataSet();
+
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // enqueue several messages read from DB
+                try
+                {
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        {
+                            sqlConnection.Open();
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+                            foreach (string textBody in entries)
+                            {
+                                producer.Send(session.CreateTextMessage(textBody));
+                            }
+
+                            using (SqlCommand sqlDeleteCommand = new SqlCommand(
+                                string.Format("DELETE FROM {0}", testTable), sqlConnection))
+                            {
+                                int count = sqlDeleteCommand.ExecuteNonQuery();
+                                Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+                            }
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        #endregion
+
+        #region Consume Messages Use Cases
+
+        protected static void ReadFromQueueAndInsertIntoDbWithCommit(INetTxConnection connection)
+        {
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // read message from queue and insert into db table
+                try
+                {
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        using (SqlCommand sqlInsertCommand = new SqlCommand())
+                        {
+                            sqlConnection.Open();
+                            sqlInsertCommand.Connection = sqlConnection;
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            for (int i = 0; i < MSG_COUNT; i++)
+                            {
+                                ITextMessage message = consumer.Receive() as ITextMessage;
+                                Assert.IsNotNull(message, "missing message");
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                            }
+
+                            scoped.Complete();
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        protected static void ReadFromQueueAndInsertIntoDbWithScopeAborted(INetTxConnection connection)
+        {
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // read message from queue and insert into db table
+                try
+                {
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        using (SqlCommand sqlInsertCommand = new SqlCommand())
+                        {
+                            sqlConnection.Open();
+                            sqlInsertCommand.Connection = sqlConnection;
+                            Assert.IsNotNull(Transaction.Current);
+
+                            for (int i = 0; i < MSG_COUNT; i++)
+                            {
+                                ITextMessage message = consumer.Receive() as ITextMessage;
+                                Assert.IsNotNull(message, "missing message");
+
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                            }
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        #endregion
+    }
+}