You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/28 16:30:50 UTC
svn commit: r1621142 [4/4] - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/
main/csharp/Commands/ main/csharp/Util/ test/csharp/Transport/failover/
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Aug 28 14:30:49 2014
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.Util;
@@ -57,6 +58,7 @@ namespace Apache.NMS.ActiveMQ
protected bool disposed = false;
protected bool closed = false;
protected bool closing = false;
+ protected Atomic<bool> clearInProgress = new Atomic<bool>();
private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TimeSpan requestTimeout;
@@ -73,8 +75,8 @@ namespace Apache.NMS.ActiveMQ
this.requestTimeout = connection.RequestTimeout;
this.dispatchAsync = connection.DispatchAsync;
this.transactionContext = CreateTransactionContext();
- this.exclusive = connection.ExclusiveConsumer;
- this.retroactive = connection.UseRetroactiveConsumer;
+ this.exclusive = connection.ExclusiveConsumer;
+ this.retroactive = connection.UseRetroactiveConsumer;
Uri brokerUri = connection.BrokerUri;
@@ -282,10 +284,26 @@ namespace Apache.NMS.ActiveMQ
set { this.producerTransformer = value; }
}
- internal Scheduler Scheduler
- {
- get { return this.connection.Scheduler; }
- }
+ internal Scheduler Scheduler
+ {
+ get { return this.connection.Scheduler; }
+ }
+
+ internal List<MessageConsumer> Consumers
+ {
+ get
+ {
+ List<MessageConsumer> copy = new List<MessageConsumer>();
+ lock(consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in consumers.Values)
+ {
+ copy.Add(consumer);
+ }
+ }
+ return copy;
+ }
+ }
#endregion
@@ -338,13 +356,13 @@ namespace Apache.NMS.ActiveMQ
internal void DoClose()
{
- Shutdown();
+ Shutdown();
RemoveInfo removeInfo = new RemoveInfo();
removeInfo.ObjectId = this.info.SessionId;
removeInfo.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
this.connection.Oneway(removeInfo);
- }
-
+ }
+
internal void Shutdown()
{
Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
@@ -524,18 +542,18 @@ namespace Apache.NMS.ActiveMQ
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
- if (IsIndividualAcknowledge)
- {
- throw new NMSException("Cannot create a durable consumer for a session that is using " +
- "Individual Acknowledgement mode.");
- }
+ if (IsIndividualAcknowledge)
+ {
+ throw new NMSException("Cannot create a durable consumer for a session that is using " +
+ "Individual Acknowledgement mode.");
+ }
ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
MessageConsumer consumer = null;
try
{
- consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, name, selector,
+ consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, name, selector,
this.connection.PrefetchPolicy.DurableTopicPrefetch,
this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
noLocal);
@@ -564,7 +582,7 @@ namespace Apache.NMS.ActiveMQ
}
internal virtual MessageConsumer DoCreateMessageConsumer(
- ConsumerId id, ActiveMQDestination destination, string name, string selector,
+ ConsumerId id, ActiveMQDestination destination, string name, string selector,
int prefetch, int maxPending, bool noLocal)
{
return new MessageConsumer(this, id, destination, name, selector, prefetch,
@@ -825,7 +843,7 @@ namespace Apache.NMS.ActiveMQ
{
consumers.Remove(consumer.ConsumerId);
}
- connection.RemoveDispatcher(consumer);
+ connection.RemoveDispatcher(consumer);
}
public void AddProducer(MessageProducer producer)
@@ -878,13 +896,13 @@ namespace Apache.NMS.ActiveMQ
public void Start()
{
- lock(this.consumers.SyncRoot)
- {
- foreach(MessageConsumer consumer in this.consumers.Values)
- {
- consumer.Start();
- }
- }
+ lock(this.consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in this.consumers.Values)
+ {
+ consumer.Start();
+ }
+ }
if(this.executor != null)
{
@@ -900,9 +918,13 @@ namespace Apache.NMS.ActiveMQ
}
}
- internal void Redispatch(MessageDispatchChannel channel)
+ internal void Redispatch(IDispatcher dispatcher, MessageDispatchChannel channel)
{
MessageDispatch[] messages = channel.RemoveAll();
+ foreach (MessageDispatch dispatch in messages)
+ {
+ this.connection.RollbackDuplicate(dispatcher, dispatch.Message);
+ }
System.Array.Reverse(messages);
foreach(MessageDispatch message in messages)
@@ -926,20 +948,31 @@ namespace Apache.NMS.ActiveMQ
this.executor.ClearMessagesInProgress();
}
+ if (this.consumers.Count == 0)
+ {
+ return;
+ }
+
// Because we are called from inside the Transport Reconnection logic
// we spawn the Consumer clear to another Thread so that we can avoid
// any lock contention that might exist between the consumer and the
- // connection that is reconnecting. Use the Connection Scheduler so
- // that the clear calls are done one at a time to avoid further
- // contention on the Connection and Session resources.
- lock(this.consumers.SyncRoot)
+ // connection that is reconnecting. Use the Connection Scheduler so
+ // that the clear calls are done one at a time to avoid further
+ // contention on the Connection and Session resources.
+ if (clearInProgress.CompareAndSet(false, true))
{
- foreach(MessageConsumer consumer in this.consumers.Values)
+ lock(this.consumers.SyncRoot)
{
- consumer.InProgressClearRequired();
- Interlocked.Increment(ref transportInterruptionProcessingComplete);
- Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
+ foreach(MessageConsumer consumer in this.consumers.Values)
+ {
+ consumer.InProgressClearRequired();
+ Interlocked.Increment(ref transportInterruptionProcessingComplete);
+ Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
+ }
}
+
+ // Clear after all consumer have had their ClearMessagesInProgress method called.
+ Scheduler.ExecuteAfterDelay(ResetClearInProgressFlag, clearInProgress, 0);
}
}
@@ -955,6 +988,12 @@ namespace Apache.NMS.ActiveMQ
consumer.ClearMessagesInProgress();
}
+ private static void ResetClearInProgressFlag(object value)
+ {
+ Atomic<bool> clearInProgress = value as Atomic<bool>;
+ clearInProgress.Value = false;
+ }
+
internal void Acknowledge()
{
lock(this.consumers.SyncRoot)
@@ -986,10 +1025,10 @@ namespace Apache.NMS.ActiveMQ
internal void SendAck(MessageAck ack, bool lazy)
{
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Session sending Ack: " + ack);
- }
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Session sending Ack: " + ack);
+ }
if(lazy || connection.SendAcksAsync || this.IsTransacted )
{
@@ -1001,10 +1040,10 @@ namespace Apache.NMS.ActiveMQ
}
}
- protected virtual TransactionContext CreateTransactionContext()
- {
- return new TransactionContext(this);
- }
+ protected virtual TransactionContext CreateTransactionContext()
+ {
+ return new TransactionContext(this);
+ }
private void CheckClosed()
{
@@ -1055,16 +1094,16 @@ namespace Apache.NMS.ActiveMQ
internal bool IsInUse(ActiveMQTempDestination dest)
{
- lock(this.consumers.SyncRoot)
- {
- foreach(MessageConsumer consumer in this.consumers.Values)
- {
- if(consumer.IsInUse(dest))
- {
- return true;
- }
- }
- }
+ lock(this.consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in this.consumers.Values)
+ {
+ if(consumer.IsInUse(dest))
+ {
+ return true;
+ }
+ }
+ }
return false;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Thu Aug 28 14:30:49 2014
@@ -15,12 +15,13 @@
* limitations under the License.
*/
+using System;
using System.Collections;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
{
- public enum TransactionType
+ public enum TransactionType
{
Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5, Forget = 6, End = 7
}
@@ -28,14 +29,14 @@ namespace Apache.NMS.ActiveMQ
namespace Apache.NMS.ActiveMQ
{
- public class TransactionContext
+ public class TransactionContext
{
protected readonly Session session;
protected readonly Connection connection;
protected readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
public TransactionContext(Session session)
- {
+ {
this.session = session;
this.connection = session.Connection;
}
@@ -50,33 +51,33 @@ namespace Apache.NMS.ActiveMQ
get{ return this.TransactionId != null; }
}
- public TransactionId TransactionId
- {
- get;
- protected set;
- }
-
+ public TransactionId TransactionId
+ {
+ get;
+ protected set;
+ }
+
public void AddSynchronization(ISynchronization synchronization)
{
synchronizations.Add(synchronization);
}
-
+
public void RemoveSynchronization(ISynchronization synchronization)
{
synchronizations.Remove(synchronization);
}
-
+
public virtual void Begin()
{
if(!InTransaction)
{
this.TransactionId = this.session.Connection.CreateLocalTransactionId();
-
+
TransactionInfo info = new TransactionInfo();
info.ConnectionId = this.session.Connection.ConnectionId;
info.TransactionId = this.TransactionId;
info.Type = (int) TransactionType.Begin;
-
+
this.session.Connection.Oneway(info);
SignalTransactionStarted();
@@ -87,54 +88,78 @@ namespace Apache.NMS.ActiveMQ
}
}
}
-
+
public virtual void Rollback()
{
if(InTransaction)
{
- this.BeforeEnd();
-
+ try
+ {
+ this.BeforeEnd();
+ }
+ catch (TransactionRolledBackException canOccurOnFailover)
+ {
+ Tracer.WarnFormat("Rollback processing error {0}", canOccurOnFailover.Message);
+ }
+
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("Rollback: " + this.TransactionId +
" syncCount: " +
(synchronizations != null ? synchronizations.Count : 0));
}
-
+
TransactionInfo info = new TransactionInfo();
info.ConnectionId = this.session.Connection.ConnectionId;
info.TransactionId = this.TransactionId;
info.Type = (int) TransactionType.Rollback;
-
+
this.TransactionId = null;
this.session.Connection.SyncRequest(info);
-
+
this.AfterRollback();
}
}
-
+
public virtual void Commit()
{
if(InTransaction)
{
- this.BeforeEnd();
-
+ try
+ {
+ this.BeforeEnd();
+ }
+ catch
+ {
+ Rollback();
+ throw;
+ }
+
if(Tracer.IsDebugEnabled)
{
Tracer.Debug("Commit: " + this.TransactionId +
" syncCount: " +
(synchronizations != null ? synchronizations.Count : 0));
}
-
+
TransactionInfo info = new TransactionInfo();
info.ConnectionId = this.session.Connection.ConnectionId;
info.TransactionId = this.TransactionId;
info.Type = (int) TransactionType.CommitOnePhase;
-
- this.TransactionId = null;
- this.session.Connection.SyncRequest(info);
-
- this.AfterCommit();
+
+ try
+ {
+ this.TransactionId = null;
+ this.session.Connection.SyncRequest(info);
+ this.AfterCommit();
+ }
+ catch (Exception e)
+ {
+ Tracer.InfoFormat("Commit failed for transaction {0} - {1}",
+ info.TransactionId, e.Message);
+ AfterRollback();
+ throw;
+ }
}
}
@@ -219,7 +244,7 @@ namespace Apache.NMS.ActiveMQ
}
}
- #endregion
+ #endregion
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs Thu Aug 28 14:30:49 2014
@@ -38,18 +38,18 @@ namespace Apache.NMS.ActiveMQ.Util
{
get{ return this.mutex; }
}
-
+
public bool Closed
{
- get
+ get
{
lock(this.mutex)
{
- return this.closed;
+ return this.closed;
}
}
-
- set
+
+ set
{
lock(this.mutex)
{
@@ -67,7 +67,7 @@ namespace Apache.NMS.ActiveMQ.Util
return this.running;
}
}
-
+
set
{
lock(this.mutex)
@@ -130,12 +130,12 @@ namespace Apache.NMS.ActiveMQ.Util
{
this.running = false;
this.closed = true;
- }
+ }
Monitor.PulseAll(this.mutex);
- }
+ }
}
-
+
public void Enqueue(MessageDispatch dispatch)
{
lock(this.mutex)
@@ -163,27 +163,27 @@ namespace Apache.NMS.ActiveMQ.Util
{
Monitor.Wait(this.mutex, timeout);
}
-
- if( Closed || !Running || Empty )
+
+ if( Closed || !Running || Empty )
{
return null;
}
-
- return DequeueNoWait();
+
+ return DequeueNoWait();
}
}
public MessageDispatch DequeueNoWait()
{
MessageDispatch result = null;
-
+
lock(this.mutex)
{
- if( Closed || !Running || Empty )
+ if( Closed || !Running || Empty )
{
return null;
}
-
+
result = channel.First.Value;
this.channel.RemoveFirst();
}
@@ -195,11 +195,11 @@ namespace Apache.NMS.ActiveMQ.Util
{
lock(this.mutex)
{
- if( Closed || !Running || Empty )
+ if( Closed || !Running || Empty )
{
return null;
}
-
+
return channel.First.Value;
}
}
@@ -215,7 +215,7 @@ namespace Apache.NMS.ActiveMQ.Util
public MessageDispatch[] RemoveAll()
{
MessageDispatch[] result;
-
+
lock(mutex)
{
result = new MessageDispatch[this.Count];
@@ -225,6 +225,14 @@ namespace Apache.NMS.ActiveMQ.Util
return result;
}
+
+ public void Signal()
+ {
+ lock(mutex)
+ {
+ Monitor.PulseAll(this.mutex);
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Thu Aug 28 14:30:49 2014
@@ -33,7 +33,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get;
}
-
+
bool Closed
{
get;
@@ -61,7 +61,7 @@ namespace Apache.NMS.ActiveMQ.Util
void Stop();
void Close();
-
+
void Enqueue(MessageDispatch dispatch);
void EnqueueFirst(MessageDispatch dispatch);
@@ -75,5 +75,7 @@ namespace Apache.NMS.ActiveMQ.Util
void Clear();
MessageDispatch[] RemoveAll();
+
+ void Signal();
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs Thu Aug 28 14:30:49 2014
@@ -46,15 +46,15 @@ namespace Apache.NMS.ActiveMQ.Util
{
get{ return this.mutex; }
}
-
+
public bool Closed
{
- get
+ get
{
- return this.closed;
+ return this.closed;
}
-
- set
+
+ set
{
lock(this.mutex)
{
@@ -69,7 +69,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
return this.running;
}
-
+
set
{
lock(this.mutex)
@@ -126,12 +126,12 @@ namespace Apache.NMS.ActiveMQ.Util
{
this.running = false;
this.closed = true;
- }
+ }
Monitor.PulseAll(this.mutex);
- }
+ }
}
-
+
public void Enqueue(MessageDispatch dispatch)
{
lock(this.mutex)
@@ -161,12 +161,12 @@ namespace Apache.NMS.ActiveMQ.Util
{
Monitor.Wait(this.mutex, timeout);
}
-
- if( Closed || !Running || Empty )
+
+ if( Closed || !Running || Empty )
{
return null;
}
-
+
return RemoveFirst();
}
}
@@ -174,14 +174,14 @@ namespace Apache.NMS.ActiveMQ.Util
public MessageDispatch DequeueNoWait()
{
MessageDispatch result = null;
-
+
lock(this.mutex)
{
- if( Closed || !Running || Empty )
+ if( Closed || !Running || Empty )
{
return null;
}
-
+
result = RemoveFirst();
}
@@ -192,11 +192,11 @@ namespace Apache.NMS.ActiveMQ.Util
{
lock(this.mutex)
{
- if( Closed || !Running || Empty )
+ if( Closed || !Running || Empty )
{
return null;
}
-
+
return GetFirst();
}
}
@@ -215,7 +215,7 @@ namespace Apache.NMS.ActiveMQ.Util
public MessageDispatch[] RemoveAll()
{
MessageDispatch[] result;
-
+
lock(mutex)
{
result = new MessageDispatch[this.size];
@@ -234,6 +234,14 @@ namespace Apache.NMS.ActiveMQ.Util
return result;
}
+ public void Signal()
+ {
+ lock(mutex)
+ {
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
protected int GetPriority(MessageDispatch message)
{
int priority = (int) MsgPriority.Lowest;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs Thu Aug 28 14:30:49 2014
@@ -72,7 +72,7 @@ namespace Apache.NMS.ActiveMQ.Test
using(ISession session = connection.CreateSession())
{
IDestination destination = session.GetQueue(destinationName);
- PurgeQueue(connection, destination);
+ DeleteQueue(connection, destination);
}
Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: " + destinationName);
@@ -89,10 +89,12 @@ namespace Apache.NMS.ActiveMQ.Test
}
catch(TransactionRolledBackException)
{
+ Tracer.Info("TEST: Caught expected TransactionRolledBackException");
}
- catch
+ catch(Exception ex)
{
- Assert.Fail("Should have thrown a TransactionRolledBackException");
+ Assert.Fail("Should have thrown a TransactionRolledBackException, but was: " +
+ ex.GetType().Name);
}
}
@@ -240,6 +242,50 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.IsTrue(this.resumed);
}
+ [Test]
+ public void TestMessageDeliveredAfterCommitFailsAndRollback()
+ {
+ string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
+ IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+ using(connection = factory.CreateConnection() as Connection)
+ {
+ using(ISession session = connection.CreateSession())
+ {
+ IDestination destination = session.GetQueue(destinationName);
+ DeleteQueue(connection, destination);
+ PutOneMsgIntoQueue(session, destination);
+ }
+
+ using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+ {
+ connection.Start();
+
+ ITransport transport = (connection as Connection).ITransport;
+ TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+ Assert.IsNotNull(tcpFaulty);
+ tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+ IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(destinationName));
+ IMessage message = consumer.Receive(TimeSpan.FromSeconds(30));
+ Assert.IsNotNull(message, "Message was not delivered");
+ Tracer.Debug("Commiting transaction");
+
+ try
+ {
+ Tracer.Info("Now attempting to commit the transaction");
+ session.Commit();
+ }
+ catch (Exception ex)
+ {
+ Tracer.InfoFormat("Commit failed as expected. {0}", ex.Message);
+ }
+
+ message = consumer.Receive(TimeSpan.FromSeconds(30));
+ Assert.IsNotNull(message, "message was not redilivered");
+ }
+ }
+ }
+
public void TransportInterrupted()
{
this.interrupted = true;
@@ -252,15 +298,25 @@ namespace Apache.NMS.ActiveMQ.Test
private void PutMsgIntoQueue(ISession session, IDestination destination)
{
- PutMsgIntoQueue(session, destination, true);
+ PutMsgIntoQueue(session, destination, true, MSG_COUNT);
+ }
+
+ private void PutOneMsgIntoQueue(ISession session, IDestination destination)
+ {
+ PutMsgIntoQueue(session, destination, true, 1);
}
private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit)
{
+ PutMsgIntoQueue(session, destination, commit, MSG_COUNT);
+ }
+
+ private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit, int count)
+ {
using(IMessageProducer producer = session.CreateProducer(destination))
{
ITextMessage message = session.CreateTextMessage();
- for(int i = 0; i < MSG_COUNT; ++i)
+ for(int i = 0; i < count; ++i)
{
message.Text = "Test message " + (i + 1);
producer.Send(message);
@@ -284,6 +340,14 @@ namespace Apache.NMS.ActiveMQ.Test
}
}
+ private void DeleteQueue(IConnection connection, IDestination queue)
+ {
+ using (ISession session = connection.CreateSession())
+ {
+ session.DeleteDestination(queue);
+ }
+ }
+
private void BreakConnection()
{
TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport)) as TcpTransport;
@@ -303,13 +367,13 @@ namespace Apache.NMS.ActiveMQ.Test
TransactionInfo txInfo = command as TransactionInfo;
if (txInfo.Type == (byte)TransactionType.CommitOnePhase)
{
- Tracer.Debug("Closing the TcpTransport to simulate an connection drop.");
+ Tracer.Debug("Exception from the Commit to simulate an connection drop.");
commitFailed = true;
- (transport as TcpTransport).Close();
+ TcpTransport tcpTransport = transport as TcpTransport;
+ tcpTransport.Close();
}
}
}
-
}
}