You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/02/22 18:00:47 UTC
svn commit: r1073408 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp:
DtcBasicTransactionsTest.cs DtcConsumerTransactionsTest.cs
DtcProducerTransactionsTest.cs DtcTransactionsTestSupport.cs
Author: tabish
Date: Tue Feb 22 17:00:46 2011
New Revision: 1073408
URL: http://svn.apache.org/viewvc?rev=1073408&view=rev
Log:
Adds a suite of tests for verifying the functionality of MSDTC based transaction support.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcBasicTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+ [TestFixture]
+ [Category("Manual")]
+ class DtcBasicTransactionsTest : DtcTransactionsTestSupport
+ {
+ [Test]
+ [ExpectedException("Apache.NMS.NMSException")]
+ public void TestSessionCreateFailsWithInvalidLogLocation()
+ {
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ NetTxConnection con = connection as NetTxConnection;
+ NetTxRecoveryPolicy policy = con.RecoveryPolicy;
+ (policy.RecoveryLogger as RecoveryFileLogger).Location = nonExistantPath;
+ connection.CreateNetTxSession();
+ }
+ }
+
+ [Test]
+ public void TestTransactedDBReadAndProduce()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestTransacteDequeueAndDbWrite()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has commited the transaction and stored all messages
+ VerifyDatabaseTableIsFull();
+
+ // check no messages are present in the queue after commit.
+ VerifyNoMessagesInQueueNoRecovery();
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+ [TestFixture]
+ [Category("Manual")]
+ class DtcConsumerTransactionsTest : DtcTransactionsTestSupport
+ {
+ [Test]
+ public void TestRecoveryAfterCommitFailsBeforeSent()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(1000);
+ }
+
+ // transaction should not have been commited
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are not present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestRecoveryAfterCommitFailsAfterSent()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(1000);
+ }
+
+ // transaction should have been commited
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are not present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestIterativeTransactedConsume()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue(5 * MSG_COUNT);
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull(5 * MSG_COUNT);
+
+ // check messages are NOT present in the queue
+ VerifyNoMessagesInQueueNoRecovery();
+ }
+
+ [Test]
+ public void TestConsumeWithDBInsertLogLocation()
+ {
+ const string logLocation = @".\RecoveryDir";
+ const string newConnectionUri =
+ connectionURI + "?nms.RecoveryPolicy.RecoveryLogger.Location=" + logLocation;
+
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ if (Directory.Exists(logLocation))
+ {
+ Directory.Delete(logLocation, true);
+ }
+
+ Directory.CreateDirectory(logLocation);
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ Assert.AreEqual(1, Directory.GetFiles(logLocation).Length);
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are NOT present in the queue
+ VerifyBrokerQueueCount(0, newConnectionUri);
+
+ Assert.AreEqual(0, Directory.GetFiles(logLocation).Length);
+ }
+
+ [Test]
+ public void TestRecoverAfterTransactionScopeAborted()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has NOT commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestRecoverAfterRollbackFailWhenScopeAborted()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+ ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has NOT commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are recovered and present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // Messages are visible since no prepare sent
+ VerifyBrokerQueueCountNoRecovery();
+
+ // verify sql server has NOT commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestRecoverAfterFailOnTransactionAfterPrepareSent()
+ {
+ // Test initialize - Fills in queue with data to send and clears the DB.
+ PurgeDatabase();
+ PurgeAndFillQueue();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // not visible yet because it must be rolled back
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // verify sql server has NOT commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcProducerTransactionsTest.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
+{
+ [TestFixture]
+ [Category("Manual")]
+ class DtcProducerTransactionsTest : DtcTransactionsTestSupport
+ {
+ [Test]
+ public void TestRecoverAfterFailOnTransactionCommit()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(1000);
+ }
+
+ // transaction should not have been commited
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestRecoverAfterFailOnTransactionPostCommitSend()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(1000);
+ }
+
+ // transaction should have been commited
+ VerifyBrokerQueueCountNoRecovery();
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount();
+ }
+
+ [Test]
+ public void TestNoRecoverAfterFailOnTransactionWhenLogDeleted()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+ NetTxConnectionFactory netTxFactory = factory as NetTxConnectionFactory;
+ RecoveryFileLogger logger = netTxFactory.RecoveryPolicy.RecoveryLogger as RecoveryFileLogger;
+ string logDirectory = logger.Location;
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // transaction should not have been commited
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // delete all recovery files
+ foreach (string file in Directory.GetFiles(logDirectory, "*.bin"))
+ {
+ File.Delete(file);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are NOT present in the queue bacause recovery file has been deleted
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestNoRecoverAfterFailOnTransactionWhenLogWriteFails()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ const string newConnectionUri =
+ connectionURI + "?nms.RecoveryPolicy.RecoveryLoggerType=harness";
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ IRecoveryLogger logger = (connection as NetTxConnection).RecoveryPolicy.RecoveryLogger;
+ Assert.IsNotNull(logger);
+ RecoveryLoggerHarness harness = logger as RecoveryLoggerHarness;
+ Assert.IsNotNull(harness);
+
+ harness.PreLogRecoveryInfoEvent += FailOnPreLogRecoveryHook;
+
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has not commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are not present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has not commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are not present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestRecoverAfterFailOnTransactionDuringPrepareSend()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestRecoverAfterTransactionScopeAborted()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has NOT commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // check messages are NOT present in the queue
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestRecoverAfterRollbackFailWhenScopeAborted()
+ {
+ // Test initialize - Fills in DB with data to send.
+ PrepareDatabase();
+
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+ ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+ Thread.Sleep(2000);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsFull();
+
+ // before recovering, messages should NOT be present in the queue
+ VerifyNoMessagesInQueueNoRecovery();
+
+ // check messages are not present in the queue after recover
+ VerifyNoMessagesInQueue();
+ }
+
+ [Test]
+ public void TestIterativeTransactedProduceWithDBDelete()
+ {
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ connection.ExceptionListener += this.OnException;
+ connection.Start();
+
+ PrepareDatabase();
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ PrepareDatabase();
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ PrepareDatabase();
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ PrepareDatabase();
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+
+ PrepareDatabase();
+ ReadFromDbAndProduceToQueueWithCommit(connection);
+ }
+
+ // verify sql server has commited the transaction
+ VerifyDatabaseTableIsEmpty();
+
+ // check messages are present in the queue
+ VerifyBrokerQueueCount(MSG_COUNT * 5);
+ }
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs?rev=1073408&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcTransactionsTestSupport.cs Tue Feb 22 17:00:46 2011
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Transactions;
+using System.Threading;
+
+using NUnit.Framework;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using System.Data.SqlClient;
+using System.Collections;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ // PREREQUISITES to run those tests :
+ // - A local instance of sql server 2008 running, with a db (e.g. TestDB) that
+ // as a table (e.g. TestTable) with a single column (e.g. TestID) of type INT.
+ // The test default to using an SQL Connection string with a user id of
+ // 'user' and the password 'password'
+ // - AMQ Server 5.4.2+
+ // - NMS 1.5+
+ //
+ // IMPORTANT
+ // Because you cannot perform recovery in a process more than once you cannot
+ // run these tests sequentially in the NUnit GUI or NUnit Console runner you
+ // must run them one at a time from the console or using a tool like the ReSharper
+ // plugin for Visual Studio.
+ //
+
+ public class DtcTransactionsTestSupport : NMSTestSupport
+ {
+ protected const int MSG_COUNT = 5;
+ protected string nonExistantPath;
+
+ private ITrace oldTracer;
+
+ protected const string sqlConnectionString =
+ "Data Source=localhost;Initial Catalog=TestDB;User ID=user;Password=password";
+ protected const string testTable = "TestTable";
+ protected const string testColumn = "TestID";
+ protected const string testQueueName = "TestQueue";
+ protected const string connectionURI = "tcpfaulty://${activemqhost}:61616";
+
+ [SetUp]
+ public override void SetUp()
+ {
+ this.oldTracer = Tracer.Trace;
+ this.nonExistantPath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + Guid.NewGuid();
+
+ base.SetUp();
+
+ PurgeDestination();
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ DeleteDestination();
+
+ base.TearDown();
+
+ Tracer.Trace = this.oldTracer;
+ }
+
+ protected void OnException(Exception ex)
+ {
+ Tracer.Debug("Test Driver received Error Notification: " + ex.Message);
+ }
+
+ #region Database Utility Methods
+
+ protected static void PrepareDatabase()
+ {
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+
+ // remove all data from test table
+ using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+ {
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ // add some data to test table
+ for (int i = 0; i < MSG_COUNT; ++i)
+ {
+ using (SqlCommand sqlCommand = new SqlCommand(
+ string.Format(
+ "INSERT INTO {0} ({1}) values ({2})",
+ testTable,
+ testColumn,
+ i), sqlConnection))
+ {
+ sqlCommand.ExecuteNonQuery();
+ }
+ }
+
+ sqlConnection.Close();
+ }
+ }
+
+ protected static void PurgeDatabase()
+ {
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+
+ // remove all data from test table
+ using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+ {
+ sqlCommand.ExecuteNonQuery();
+ }
+
+ sqlConnection.Close();
+ }
+ }
+
+ protected static IList ExtractDataSet()
+ {
+ IList entries = new ArrayList();
+
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+
+ using (SqlCommand sqlReadCommand = new SqlCommand(
+ string.Format("SELECT {0} FROM {1}", testColumn, testTable), sqlConnection))
+ using (SqlDataReader reader = sqlReadCommand.ExecuteReader())
+ {
+ while (reader.Read())
+ {
+ entries.Add("Hello World " + (int)reader[0]);
+ }
+ }
+ }
+
+ return entries;
+ }
+
+ protected static void VerifyDatabaseTableIsEmpty()
+ {
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+ SqlCommand sqlCommand = new SqlCommand(
+ string.Format("SELECT COUNT(*) FROM {0}", testTable),
+ sqlConnection);
+ int count = (int)sqlCommand.ExecuteScalar();
+ Assert.AreEqual(0, count, "wrong number of rows in DB");
+ }
+ }
+
+ protected static void VerifyDatabaseTableIsFull()
+ {
+ VerifyDatabaseTableIsFull(MSG_COUNT);
+ }
+
+ protected static void VerifyDatabaseTableIsFull(int expected)
+ {
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+ SqlCommand sqlCommand = new SqlCommand(
+ string.Format("SELECT COUNT(*) FROM {0}", testTable),
+ sqlConnection);
+ int count = (int)sqlCommand.ExecuteScalar();
+ Assert.AreEqual(expected, count, "wrong number of rows in DB");
+ }
+ }
+
+ #endregion
+
+ #region Destination Utility Methods
+
+ protected static void DeleteDestination()
+ {
+ IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (Connection connection = factory.CreateConnection() as Connection)
+ {
+ using (ISession session = connection.CreateSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+ try
+ {
+ connection.DeleteDestination(queue);
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+
+ protected static void PurgeDestination()
+ {
+ IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = factory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ using (IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(testQueueName)))
+ {
+ IMessage recvd = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ while (recvd != null)
+ {
+ Tracer.Debug("Setup Purged Message: " + recvd);
+ }
+ }
+ }
+ }
+
+ protected static void PurgeAndFillQueue()
+ {
+ PurgeAndFillQueue(MSG_COUNT);
+ }
+
+ protected static void PurgeAndFillQueue(int msgCount)
+ {
+ IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = factory.CreateConnection())
+ {
+ connection.Start();
+
+ using (ISession session = connection.CreateSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // empty queue
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ while ((consumer.Receive(TimeSpan.FromMilliseconds(2000))) != null)
+ {
+ }
+ }
+
+ // enqueue several messages
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+ for (int i = 0; i < msgCount; i++)
+ {
+ producer.Send(session.CreateTextMessage(i.ToString()));
+ }
+ }
+ }
+ }
+ }
+
+ #endregion
+
+ #region Broker Queue State Validation Routines
+
+ protected static void VerifyBrokerQueueCountNoRecovery()
+ {
+ VerifyBrokerQueueCountNoRecovery(MSG_COUNT);
+ }
+
+ protected static void VerifyBrokerQueueCountNoRecovery(int expectedNumberOfMessages)
+ {
+ IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+ using (IConnection connection = factory.CreateConnection())
+ {
+ // check messages are present in the queue
+ using (ISession session = connection.CreateSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ using (IQueueBrowser browser = session.CreateBrowser(queue))
+ {
+ connection.Start();
+ int count = 0;
+ IEnumerator enumerator = browser.GetEnumerator();
+
+ while(enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue !");
+ count++;
+ }
+
+ // count should match the expected count
+ Assert.AreEqual(expectedNumberOfMessages, count);
+ }
+ }
+ }
+ }
+
+ protected static void VerifyBrokerQueueCount()
+ {
+ VerifyBrokerQueueCount(MSG_COUNT, connectionURI);
+ }
+
+ protected static void VerifyBrokerQueueCount(int expectedCount)
+ {
+ VerifyBrokerQueueCount(expectedCount, connectionURI);
+ }
+
+ protected static void VerifyBrokerQueueCount(string connectionUri)
+ {
+ VerifyBrokerQueueCount(MSG_COUNT, connectionUri);
+ }
+
+ protected static void VerifyBrokerQueueCount(int expectedCount, string connectionUri)
+ {
+ INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionUri));
+
+ using (INetTxConnection connection = factory.CreateNetTxConnection())
+ {
+ // check messages are present in the queue
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ using (IQueueBrowser browser = session.CreateBrowser(queue))
+ {
+ connection.Start();
+ int count = 0;
+ IEnumerator enumerator = browser.GetEnumerator();
+
+ while (enumerator.MoveNext())
+ {
+ IMessage msg = enumerator.Current as IMessage;
+ Assert.IsNotNull(msg, "message is not in the queue !");
+ count++;
+ }
+
+ // count should match the expected count
+ Assert.AreEqual(expectedCount, count);
+ }
+ }
+ }
+ }
+
+ protected static void VerifyNoMessagesInQueueNoRecovery()
+ {
+ VerifyBrokerQueueCountNoRecovery(0);
+ }
+
+ protected static void VerifyNoMessagesInQueue()
+ {
+ VerifyBrokerQueueCount(0);
+ }
+
+ #endregion
+
+ #region Transport Hools for controlling failure point.
+
+ public void FailOnPrepareTransportHook(ITransport transport, Command command)
+ {
+ if (command is TransactionInfo)
+ {
+ TransactionInfo txInfo = command as TransactionInfo;
+ if (txInfo.Type == (byte)TransactionType.Prepare)
+ {
+ Thread.Sleep(1000);
+ Tracer.Debug("Throwing Error on Prepare.");
+ throw new Exception("Error writing Prepare command");
+ }
+ }
+ }
+
+ public void FailOnRollbackTransportHook(ITransport transport, Command command)
+ {
+ if (command is TransactionInfo)
+ {
+ TransactionInfo txInfo = command as TransactionInfo;
+ if (txInfo.Type == (byte)TransactionType.Rollback)
+ {
+ Tracer.Debug("Throwing Error on Rollback.");
+ throw new Exception("Error writing Rollback command");
+ }
+ }
+ }
+
+ public void FailOnCommitTransportHook(ITransport transport, Command command)
+ {
+ if (command is TransactionInfo)
+ {
+ TransactionInfo txInfo = command as TransactionInfo;
+ if (txInfo.Type == (byte)TransactionType.CommitTwoPhase)
+ {
+ Tracer.Debug("Throwing Error on Commit.");
+ throw new Exception("Error writing Commit command");
+ }
+ }
+ }
+
+ #endregion
+
+ #region Recovery Harness Hooks for controlling failure conditions
+
+ public void FailOnPreLogRecoveryHook(XATransactionId xid, byte[] recoveryInformatio)
+ {
+ Tracer.Debug("Throwing Error before the Recovery Information is Logged.");
+ throw new Exception("Intentional Error Logging Recovery Information");
+ }
+
+ #endregion
+
+ #region Produce Messages use cases
+
+ protected static void ReadFromDbAndProduceToQueueWithCommit(INetTxConnection connection)
+ {
+ IList entries = ExtractDataSet();
+
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // enqueue several messages read from DB
+ try
+ {
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+
+ Assert.IsNotNull(Transaction.Current);
+
+ Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+ foreach (string textBody in entries)
+ {
+ producer.Send(session.CreateTextMessage(textBody));
+ }
+
+ using (SqlCommand sqlDeleteCommand = new SqlCommand(
+ string.Format("DELETE FROM {0}", testTable), sqlConnection))
+ {
+ int count = sqlDeleteCommand.ExecuteNonQuery();
+ Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+ }
+
+ scoped.Complete();
+ }
+ }
+ }
+ catch (Exception e) // exception thrown in TransactionContext.Commit(Enlistment enlistment)
+ {
+ Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+ Tracer.Debug(e.ToString());
+ }
+ }
+ }
+
+ protected static void ReadFromDbAndProduceToQueueWithScopeAborted(INetTxConnection connection)
+ {
+ IList entries = ExtractDataSet();
+
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // enqueue several messages read from DB
+ try
+ {
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ {
+ sqlConnection.Open();
+
+ Assert.IsNotNull(Transaction.Current);
+
+ Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+ foreach (string textBody in entries)
+ {
+ producer.Send(session.CreateTextMessage(textBody));
+ }
+
+ using (SqlCommand sqlDeleteCommand = new SqlCommand(
+ string.Format("DELETE FROM {0}", testTable), sqlConnection))
+ {
+ int count = sqlDeleteCommand.ExecuteNonQuery();
+ Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+ Tracer.Debug(e.ToString());
+ }
+ }
+ }
+
+ #endregion
+
+ #region Consume Messages Use Cases
+
+ protected static void ReadFromQueueAndInsertIntoDbWithCommit(INetTxConnection connection)
+ {
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ try
+ {
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+
+ Assert.IsNotNull(Transaction.Current);
+
+ for (int i = 0; i < MSG_COUNT; i++)
+ {
+ ITextMessage message = consumer.Receive() as ITextMessage;
+ Assert.IsNotNull(message, "missing message");
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+ }
+
+ scoped.Complete();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+ Tracer.Debug(e.ToString());
+ }
+ }
+ }
+
+ protected static void ReadFromQueueAndInsertIntoDbWithScopeAborted(INetTxConnection connection)
+ {
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+
+ // read message from queue and insert into db table
+ try
+ {
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+ using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+ using (SqlCommand sqlInsertCommand = new SqlCommand())
+ {
+ sqlConnection.Open();
+ sqlInsertCommand.Connection = sqlConnection;
+ Assert.IsNotNull(Transaction.Current);
+
+ for (int i = 0; i < MSG_COUNT; i++)
+ {
+ ITextMessage message = consumer.Receive() as ITextMessage;
+ Assert.IsNotNull(message, "missing message");
+
+ sqlInsertCommand.CommandText =
+ string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+ sqlInsertCommand.ExecuteNonQuery();
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+ Tracer.Debug(e.ToString());
+ }
+ }
+ }
+
+ #endregion
+ }
+}