You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/08 17:27:16 UTC
svn commit: r961805 - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./
Commands/ State/
Author: tabish
Date: Thu Jul 8 15:27:15 2010
New Revision: 961805
URL: http://svn.apache.org/viewvc?rev=961805&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-254
Adds more failover handling, basic failover is working now.
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseMessage.cs Thu Jul 8 15:27:15 2010
@@ -19,6 +19,7 @@ using System;
using System.Collections;
using Apache.NMS.Util;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -264,6 +265,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processMessage( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionError.cs Thu Jul 8 15:27:15 2010
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -75,6 +76,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processConnectionError( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs Thu Jul 8 15:27:15 2010
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -91,6 +92,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processAddConnection( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConsumerInfo.cs Thu Jul 8 15:27:15 2010
@@ -19,6 +19,7 @@ using System;
using System.Collections;
using Apache.NMS;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -167,6 +168,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processAddConsumer( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageAck.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class MessageAck : BaseCommand
@@ -115,6 +117,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processMessageAck( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/MessageDispatch.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class MessageDispatch : BaseCommand
@@ -133,6 +135,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processMessageDispatch( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class ProducerInfo : BaseCommand
@@ -83,6 +85,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processAddProducer( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs Thu Jul 8 15:27:15 2010
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -61,6 +62,29 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ ///
+ /// <summery>
+ /// Allows a Visitor to visit this command and return a response to the
+ /// command based on the command type being visited. The command will call
+ /// the proper processXXX method in the visitor.
+ /// </summery>
+ ///
+ public override Response visit(ICommandVisitor visitor)
+ {
+ switch(objectId.GetDataStructureType())
+ {
+ case DataStructureTypes.ConnectionIdType:
+ return visitor.processRemoveConnection((ConnectionId) objectId);
+ case DataStructureTypes.SessionIdType:
+ return visitor.processRemoveSession((SessionId) objectId);
+ case DataStructureTypes.ConsumerIdType:
+ return visitor.processRemoveConsumer((ConsumerId) objectId);
+ case DataStructureTypes.ProducerIdType:
+ return visitor.processRemoveProducer((ProducerId) objectId);
+ default:
+ throw new IOException("Unknown remove command type: " + objectId.GetDataStructureType());
+ }
+ }
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveSubscriptionInfo.cs Thu Jul 8 15:27:15 2010
@@ -17,6 +17,7 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
namespace Apache.NMS.Stomp.Commands
{
@@ -92,6 +93,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processRemoveSubscriptionInfo( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SessionInfo.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class SessionInfo : BaseCommand
@@ -74,6 +76,11 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ return visitor.processAddSession( this );
+ }
+
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/SubscriptionInfo.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class SubscriptionInfo : BaseDataStructure
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TransactionInfo.cs Thu Jul 8 15:27:15 2010
@@ -18,6 +18,8 @@
using System;
using System.Collections;
+using Apache.NMS.Stomp.State;
+
namespace Apache.NMS.Stomp.Commands
{
public class TransactionInfo : BaseCommand
@@ -87,6 +89,20 @@ namespace Apache.NMS.Stomp.Commands
}
}
+ public override Response visit(ICommandVisitor visitor)
+ {
+ switch(type)
+ {
+ case TransactionInfo.BEGIN:
+ return visitor.processBeginTransaction(this);
+ case TransactionInfo.COMMIT:
+ return visitor.processCommitTransaction(this);
+ case TransactionInfo.ROLLBACK:
+ return visitor.processRollbackTransaction(this);
+ default:
+ throw new IOException("Transaction info type unknown: " + type);
+ }
+ }
};
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Thu Jul 8 15:27:15 2010
@@ -20,6 +20,7 @@ using System.Collections;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Transport;
+using Apache.NMS.Stomp.Transport.Failover;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;
@@ -61,6 +62,7 @@ namespace Apache.NMS.Stomp
private ConnectionMetaData metaData = null;
private bool disposed = false;
private IdGenerator clientIdGenerator;
+ private CountDownLatch transportInterruptionProcessingComplete;
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -532,6 +534,9 @@ namespace Apache.NMS.Stomp
{
if(command is MessageDispatch)
{
+ // We wait if the Connection is still processing interruption
+ // code to reset the MessageConsumers.
+ WaitForTransportInterruptionProcessingToComplete();
DispatchMessage((MessageDispatch) command);
}
else if(command is ConnectionError)
@@ -606,6 +611,12 @@ namespace Apache.NMS.Stomp
{
Tracer.Debug("Transport has been Interrupted.");
+ this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
+ }
+
foreach(Session session in this.sessions)
{
session.ClearMessagesInProgress();
@@ -682,5 +693,28 @@ namespace Apache.NMS.Stomp
answer.SessionId = sessionId;
return answer;
}
+
+ private void WaitForTransportInterruptionProcessingToComplete()
+ {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if(cdl != null)
+ {
+ if(!closed && cdl.Remaining > 0)
+ {
+ Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
+ "processing (" + cdl.Remaining + ") to complete..");
+ cdl.await(TimeSpan.FromSeconds(10));
+ }
+ }
+ }
+
+ internal void TransportInterruptionProcessingComplete()
+ {
+ CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+ if(cdl != null)
+ {
+ cdl.countDown();
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Thu Jul 8 15:27:15 2010
@@ -53,6 +53,7 @@ namespace Apache.NMS.Stomp
private int dispatchedCount = 0;
private volatile bool synchronizationRegistered = false;
private bool clearDispatchList = false;
+ private bool inProgressClearRequiredFlag;
private event MessageListener listener;
@@ -314,16 +315,38 @@ namespace Apache.NMS.Stomp
this.unconsumedMessages.Stop();
}
- public void ClearMessagesInProgress()
+ internal void InProgressClearRequired()
{
- // we are called from inside the transport reconnection logic
- // which involves us clearing all the connections' consumers
- // dispatch lists and clearing them
- // so rather than trying to grab a mutex (which could be already
- // owned by the message listener calling the send) we will just set
- // a flag so that the list can be cleared as soon as the
- // dispatch thread is ready to flush the dispatch list
- this.clearDispatchList = true;
+ inProgressClearRequiredFlag = true;
+ // deal with delivered messages async to avoid lock contention with in progress acks
+ clearDispatchList = true;
+ }
+
+ internal void ClearMessagesInProgress()
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ // Called from a thread in the ThreadPool, so we wait until we can
+ // get a lock on the unconsumed list then we clear it.
+ lock(this.unconsumedMessages)
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+ this.unconsumedMessages.Count + ") on transport interrupt");
+ }
+
+ this.unconsumedMessages.Clear();
+ this.synchronizationRegistered = false;
+
+ // allow dispatch on this connection to resume
+ this.session.Connection.TransportInterruptionProcessingComplete();
+ this.inProgressClearRequiredFlag = false;
+ }
+ }
+ }
}
public void DeliverAcks()
@@ -744,8 +767,6 @@ namespace Apache.NMS.Stomp
redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
-// MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
foreach(MessageDispatch dispatch in this.dispatchedMessages)
{
// Allow the message to update its internal to reflect a Rollback.
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Thu Jul 8 15:27:15 2010
@@ -75,7 +75,7 @@ namespace Apache.NMS.Stomp
this.executor = new SessionExecutor(this, this.consumers);
}
-
+
~Session()
{
Dispose(false);
@@ -757,15 +757,33 @@ namespace Apache.NMS.Stomp
this.executor.ClearMessagesInProgress();
}
+ if(Transacted)
+ {
+ this.transactionContext.ResetTransactionInProgress();
+ }
+
lock(this.consumers.SyncRoot)
{
foreach(MessageConsumer consumer in this.consumers)
{
- consumer.ClearMessagesInProgress();
+ consumer.InProgressClearRequired();
+ ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
}
}
}
+ private void ClearMessages(object value)
+ {
+ MessageConsumer consumer = value as MessageConsumer;
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Performing Async Clear of In Progress Messages for Consumer: " + consumer.ConsumerId);
+ }
+
+ consumer.ClearMessagesInProgress();
+ }
+
internal void Acknowledge()
{
lock(this.consumers.SyncRoot)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Thu Jul 8 15:27:15 2010
@@ -26,17 +26,27 @@ namespace Apache.NMS.Stomp.State
return null;
}
+ public virtual Response processAddSession(SessionInfo info)
+ {
+ return null;
+ }
+
public virtual Response processAddConsumer(ConsumerInfo info)
{
return null;
}
+ public virtual Response processAddProducer(ProducerInfo info)
+ {
+ return null;
+ }
+
public virtual Response processKeepAliveInfo(KeepAliveInfo info)
{
return null;
}
- public virtual Response processMessage(Message send)
+ public virtual Response processMessage(BaseMessage send)
{
return null;
}
@@ -51,11 +61,21 @@ namespace Apache.NMS.Stomp.State
return null;
}
+ public virtual Response processRemoveSession(SessionId id)
+ {
+ return null;
+ }
+
public virtual Response processRemoveConsumer(ConsumerId id)
{
return null;
}
+ public virtual Response processRemoveProducer(ProducerId id)
+ {
+ return null;
+ }
+
public virtual Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info)
{
return null;
@@ -81,5 +101,20 @@ namespace Apache.NMS.Stomp.State
return null;
}
+ public virtual Response processBeginTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processCommitTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
+ public virtual Response processRollbackTransaction(TransactionInfo info)
+ {
+ return null;
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/State/ICommandVisitor.cs Thu Jul 8 15:27:15 2010
@@ -24,15 +24,23 @@ namespace Apache.NMS.Stomp.State
{
Response processAddConnection(ConnectionInfo info);
+ Response processAddSession(SessionInfo info);
+
Response processAddConsumer(ConsumerInfo info);
+ Response processAddProducer(ProducerInfo info);
+
Response processRemoveConnection(ConnectionId id);
+ Response processRemoveSession(SessionId id);
+
Response processRemoveConsumer(ConsumerId id);
+ Response processRemoveProducer(ProducerId id);
+
Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info);
- Response processMessage(Message send);
+ Response processMessage(BaseMessage send);
Response processMessageAck(MessageAck ack);
@@ -45,5 +53,12 @@ namespace Apache.NMS.Stomp.State
Response processConnectionError(ConnectionError error);
Response processResponse(Response response);
+
+ Response processBeginTransaction(TransactionInfo info);
+
+ Response processCommitTransaction(TransactionInfo info);
+
+ Response processRollbackTransaction(TransactionInfo info);
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs?rev=961805&r1=961804&r2=961805&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/TransactionContext.cs Thu Jul 8 15:27:15 2010
@@ -62,6 +62,15 @@ namespace Apache.NMS.Stomp
synchronizations.Remove(synchronization);
}
+ public void ResetTransactionInProgress()
+ {
+ if(InTransaction)
+ {
+ this.transactionId = null;
+ this.synchronizations.Clear();
+ }
+ }
+
public void Begin()
{
if(!InTransaction)