You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/10/27 19:24:36 UTC

svn commit: r468465 - in /incubator/activemq/activemq-dotnet/trunk/src/main/csharp: ActiveMQ/ ActiveMQ/Util/ NMS/

Author: jstrachan
Date: Fri Oct 27 10:24:35 2006
New Revision: 468465

URL: http://svn.apache.org/viewvc?view=rev&rev=468465
Log:
Applied patch from Rob Lugt for AMQ-999 to fix the threading of the ActiveMQ.Net client. Many thanks Rob!

Added:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs
Modified:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Fri Oct 27 10:24:35 2006
@@ -40,6 +40,7 @@
         private long temporaryDestinationCounter;
         private long localTransactionCounter;
         private bool closing;
+        private Util.AtomicBoolean started = new ActiveMQ.Util.AtomicBoolean(true);
         
         public Connection(ITransport transport, ConnectionInfo info)
         {
@@ -51,21 +52,44 @@
         }
         
         public event ExceptionListener ExceptionListener;
-        
-        /// <summary>
-        /// Starts message delivery for this connection.
-        /// </summary>
-        public void Start()
-        {
-        }
-        
-        
-        /// <summary>
-        /// Stop message delivery for this connection.
-        /// </summary>
-        public void Stop()
-        {
-        }
+
+
+		public bool IsStarted
+		{
+			get { return started.Value; }
+		}
+
+		/// <summary>
+		/// Starts asynchronous message delivery of incoming messages for this connection. 
+		/// Synchronous delivery is unaffected.
+		/// </summary>
+		public void Start()
+		{
+			CheckConnected();
+			if (started.compareAndSet(false, true)) 
+			{
+				foreach(Session session in sessions)
+				{
+					session.StartAsyncDelivery(null);
+				}
+			}
+		}
+
+		/// <summary>
+		/// Temporarily stop asynchronous delivery of inbound messages for this connection.
+		/// The sending of outbound messages is unaffected.
+		/// </summary>
+		public void Stop()
+		{
+			CheckConnected();
+			if (started.compareAndSet(true, false)) 
+			{
+				foreach(Session session in sessions)
+				{
+					session.StopAsyncDelivery();
+				}
+			}
+		}
         
         /// <summary>
         /// Creates a new session to work on this connection
@@ -86,21 +110,38 @@
             sessions.Add(session);
             return session;
         }
-        
+
+		public void Close()
+		{
+			if (!closed)
+			{
+				closing = true;
+				foreach (Session session in sessions)
+				{
+					session.Close();
+				}
+				sessions.Clear();
+				try
+				{
+					DisposeOf(ConnectionId);
+					transport.Oneway(new ShutdownInfo());
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during connection close: {0}", ex);
+				}
+				transport.Dispose();
+				transport = null;
+				closed = true;
+			}
+		}
+
         public void Dispose()
         {
-            /*
-            foreach (Session session in sessions)
-            {
-                session.Dispose();
-            }
-            */
-            closing = true;
-            DisposeOf(ConnectionId);
-            sessions.Clear();
-			transport.Oneway(new ShutdownInfo());
-            transport.Dispose();
-            closed = true;
+			// For now we do not distinguish between Dispose() and Close().
+			// In theory Dispose should possibly be lighter-weight and perform a (faster)
+			// disorderly close.
+			Close();
         }
         
         // Properties
@@ -178,7 +219,7 @@
         {
             RemoveInfo command = new RemoveInfo();
             command.ObjectId = objectId;
-            SyncRequest(command);
+            transport.Oneway(command);
         }
         
         
@@ -292,6 +333,13 @@
             if (ExceptionListener != null)
                 ExceptionListener(exception);
         }
+
+		internal void OnSessionException(Session sender, Exception exception)
+		{
+			Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
+			if (ExceptionListener != null)
+				ExceptionListener(exception);
+		}
         
         protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
         {

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Dispatcher.cs Fri Oct 27 10:24:35 2006
@@ -31,8 +31,23 @@
         Queue queue = new Queue();
         Object semaphore = new Object();
         ArrayList messagesToRedeliver = new ArrayList();
-        readonly AutoResetEvent resetEvent = new AutoResetEvent(false);
-		
+        
+        // TODO can't use EventWaitHandle on MONO 1.0
+        AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
+        bool m_bAsyncDelivery = false;
+        bool m_bClosed = false;
+
+		public void SetAsyncDelivery(AutoResetEvent eventHandle)
+		{
+			lock (semaphore)
+			{
+				messageReceivedEventHandle = eventHandle;
+				m_bAsyncDelivery = true;
+				if (queue.Count > 0)
+					messageReceivedEventHandle.Set();
+			}
+		}
+
         /// <summary>
         /// Whem we start a transaction we must redeliver any rolled back messages
         /// </summary>
@@ -42,7 +57,7 @@
             {
                 Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
                 foreach (ActiveMQMessage element in messagesToRedeliver)
-				{
+                {
                     replacement.Enqueue(element);
                 }
                 messagesToRedeliver.Clear();
@@ -54,7 +69,7 @@
                 }
                 queue = replacement;
                 if (queue.Count > 0)
-                    resetEvent.Set();
+                    messageReceivedEventHandle.Set();
             }
         }
         
@@ -77,7 +92,7 @@
             lock (semaphore)
             {
                 queue.Enqueue(message);
-                resetEvent.Set();
+                messageReceivedEventHandle.Set();
             }
         }
         
@@ -89,13 +104,9 @@
             IMessage rc = null;
             lock (semaphore)
             {
-                if (queue.Count > 0)
+                if (!m_bClosed && queue.Count > 0)
                 {
                     rc = (IMessage) queue.Dequeue();
-                    if (queue.Count > 0)
-                    {
-                        resetEvent.Set();
-                    }
                 } 
             }
             return rc;
@@ -106,14 +117,25 @@
         /// </summary>
         public IMessage Dequeue(TimeSpan timeout)
         {
-            IMessage rc = DequeueNoWait();
-            while (rc == null)
+            IMessage rc;
+			bool bClosed = false;
+			lock (semaphore)
+			{
+				bClosed = m_bClosed;
+				rc = DequeueNoWait();
+			}
+
+            while (!bClosed && rc == null)
             {
-                if( !resetEvent.WaitOne((int)timeout.TotalMilliseconds, false) )
+                if( !messageReceivedEventHandle.WaitOne((int)timeout.TotalMilliseconds, false) )
                 {
                     break;
                 }
-                rc = DequeueNoWait();
+				lock (semaphore)
+				{
+					rc = DequeueNoWait();
+					bClosed = m_bClosed;
+				}
             }
             return rc;
         }
@@ -123,18 +145,17 @@
         /// </summary>
         public IMessage Dequeue()
         {
-            IMessage rc = DequeueNoWait();
-            while (rc == null)
-            {
-                if (!resetEvent.WaitOne(-1, false))
-                {
-                    break;
-                }
-                rc = DequeueNoWait();
-            }
-            return rc;
+			return Dequeue(TimeSpan.MaxValue);
         }
-        
-    }
-}
 
+		internal void Close()
+		{
+			lock (semaphore)
+			{
+				m_bClosed = true;
+				if(m_bAsyncDelivery)
+					messageReceivedEventHandle.Set();
+			}
+		}
+	}
+}

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs?view=auto&rev=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/DispatchingThread.cs Fri Oct 27 10:24:35 2006
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+
+namespace ActiveMQ
+{
+	internal class DispatchingThread
+	{
+		public delegate void DispatchFunction();
+		public delegate void ExceptionHandler(Exception exception);
+
+		private readonly AutoResetEvent m_event = new AutoResetEvent(false);
+		private bool m_bStopFlag = false;
+		private Thread m_thread = null;
+		private readonly DispatchFunction m_dispatchFunc;
+		private event ExceptionHandler m_exceptionListener;
+
+		public DispatchingThread(DispatchFunction dispatchFunc)
+		{
+			m_dispatchFunc = dispatchFunc;
+		}
+
+               // TODO can't use EventWaitHandle on MONO 1.0
+		public AutoResetEvent EventHandle
+		{
+			get { return m_event; }
+		}
+
+		internal event ExceptionHandler ExceptionListener
+		{
+			add
+			{
+				m_exceptionListener += value;
+			}
+			remove 
+			{
+				m_exceptionListener -= value;
+			}
+		}
+
+		internal void Start()
+		{
+			lock (this)
+			{
+				if (m_thread == null)
+				{
+					m_bStopFlag = false;
+					m_thread = new Thread(new ThreadStart(MyThreadFunc));
+					m_event.Set();
+					Tracer.Info("Starting dispatcher thread for session");
+					m_thread.Start();
+				}
+			}
+		}
+
+		internal void Stop()
+		{
+			Stop(System.Threading.Timeout.Infinite);
+		}
+
+		
+		internal void Stop(int timeoutMilliseconds)
+		{
+			Tracer.Info("Stopping dispatcher thread for session");
+			Thread localThread = null;
+			lock (this)
+			{
+				localThread = m_thread;
+				m_thread = null;
+				if (!m_bStopFlag)
+				{
+					m_bStopFlag = true;
+					m_event.Set();
+				}
+			}
+			if(localThread!=null)
+			{
+				localThread.Join(timeoutMilliseconds);
+			}
+			Tracer.Info("Dispatcher thread joined");
+		}
+		
+		private void MyThreadFunc()
+		{
+			Tracer.Info("Dispatcher thread started");
+			while (true) // loop forever (well, at least until we've been asked to stop)
+			{
+				lock (this)
+				{
+					if (m_bStopFlag)
+						break;
+				}
+
+				try
+				{
+					m_dispatchFunc();
+				}
+				catch (Exception ex)
+				{
+					if (m_exceptionListener != null)
+						m_exceptionListener(ex);
+				}
+				m_event.WaitOne();
+			}
+			Tracer.Info("Dispatcher thread stopped");
+		}
+	}
+}
\ No newline at end of file

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs Fri Oct 27 10:24:35 2006
@@ -38,7 +38,7 @@
         private Session session;
         private ConsumerInfo info;
         private AcknowledgementMode acknowledgementMode;
-        private bool closed;
+        private bool closed = false;
         private Dispatcher dispatcher = new Dispatcher();
         private int maximumRedeliveryCount = 10;
         private int redeliveryTimeout = 500;
@@ -48,21 +48,26 @@
         {
               add {
                   listener += value;
-                  FireAsyncDispatchOfMessages();
+                  session.StartAsyncDelivery(dispatcher);
               }
               remove {
                   listener -= value;
               }
         }
         
-        
-        public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
+        // Constructor internal to prevent clients from creating an instance.
+        internal MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.session = session;
             this.info = info;
             this.acknowledgementMode = acknowledgementMode;
         }
         
+        internal Dispatcher Dispatcher
+        {
+            get { return this.dispatcher; }
+        }
+
         public ConsumerId ConsumerId
         {
             get {
@@ -95,19 +100,8 @@
         public void Dispatch(ActiveMQMessage message)
         {
             dispatcher.Enqueue(message);
-            
-            if (listener != null)
-            {
-                FireAsyncDispatchOfMessages();
-            }
         }
 
-        protected void FireAsyncDispatchOfMessages() 
-        {
-              // lets dispatch to the thread pool for this connection for messages to be processed
-              ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
-        }
-        
         public IMessage Receive()
         {
             CheckClosed();
@@ -126,37 +120,27 @@
             return AutoAcknowledge(dispatcher.DequeueNoWait());
         }
         
-        
-        
         public void Dispose()
         {
             session.DisposeOf(info.ConsumerId);
-            closed = true;
+            Close();
         }
         
         /// <summary>
         /// Dispatch any pending messages to the asynchronous listener
         /// </summary>
-        public void DispatchAsyncMessages()
+        internal void DispatchAsyncMessages()
         {
             while (listener != null)
             {
                 IMessage message = dispatcher.DequeueNoWait();
-                if (message != null)
-                {
-                   //here we add the code that if do acknowledge action.
-                   message = AutoAcknowledge(message);
-                   try
-                   {
-                       listener(message);
-                   } catch(Exception e)
-                   {
-                       // TODO: what do do if the listener errors out?
-                   }
-                }
-
-                // lets now break to give the acknowledgement a chance to be processed
-                break;
+                if (message == null)
+                    break;
+                    
+                //here we add the code that if do acknowledge action.
+                message = AutoAcknowledge(message);
+                // invoke listener. Exceptions caught by the dispatcher thread
+                listener(message); 
             }
         }
         
@@ -239,16 +223,27 @@
             else
             {
                 dispatcher.Redeliver(message);
-                
-                if (listener != null)
-                {
-                    // lets re-dispatch the message at some point in the future
-                    Thread.Sleep(RedeliveryTimeout);
-                    ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
-                }
             }
         }
-    }
+
+		public void Close()
+		{
+			lock(this)
+			{
+				if(closed)
+					return;
+			}
+
+			// wake up any pending dequeue() call on the dispatcher
+			dispatcher.Close();
+
+			lock (this)
+			{
+				closed = true;
+			}
+		}
+	}
+
     
     // TODO maybe there's a cleaner way of creating stateful delegates to make this code neater
     class MessageConsumerSynchronization : ISynchronization

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Fri Oct 27 10:24:35 2006
@@ -38,13 +38,21 @@
                 private bool retroactive;
                 private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
                 private TransactionContext transactionContext;
-
+                private DispatchingThread dispatchingThread;
+                
                 public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
                 {
                         this.connection = connection;
                         this.info = info;
                         this.acknowledgementMode = acknowledgementMode;
                         transactionContext = new TransactionContext(this);
+                        dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+                        dispatchingThread.ExceptionListener += new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+                }
+
+                void dispatchingThread_ExceptionListener(Exception exception)
+                {
+                        connection.OnSessionException(this, exception);
                 }
 
 
@@ -317,6 +325,12 @@
                         connection.SyncRequest(command);
                 }
 
+                public void Close()
+                {
+                        // To do: what about session id?
+                        StopAsyncDelivery();
+                }
+                
                 /// <summary>
                 /// Ensures that a transaction is started
                 /// </summary>
@@ -335,18 +349,17 @@
                         connection.DisposeOf(objectId);
                 }
 
-                public void DispatchAsyncMessages(object state)
+                /// <summary>
+                /// Private method called by the dispatcher thread in order to perform
+                /// asynchronous delivery of queued (inbound) messages.
+                /// </summary>
+                private void DispatchAsyncMessages()
                 {
                         // lets iterate through each consumer created by this session
                         // ensuring that they have all pending messages dispatched
-                        lock (this)
+                        foreach (MessageConsumer consumer in GetConsumers())
                         {
-                                // lets ensure that only 1 thread dispatches messages in a consumer at once
-
-                                foreach (MessageConsumer consumer in GetConsumers())
-                                {
-                                        consumer.DispatchAsyncMessages();
-                                }
+                                consumer.DispatchAsyncMessages();
                         }
                 }
 
@@ -425,5 +438,17 @@
                 protected void Configure(ActiveMQMessage message)
                 {
                 }
+
+				internal void StopAsyncDelivery()
+				{
+					dispatchingThread.Stop();
+				}
+
+				internal void StartAsyncDelivery(Dispatcher dispatcher)
+				{
+					if(dispatcher != null)
+						dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
+					dispatchingThread.Start();
+				}
         }
 }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Fri Oct 27 10:24:35 2006
@@ -23,6 +23,17 @@
     {
         bool value;
 
+		public bool Value
+		{
+			get
+			{
+				lock (this)
+				{
+					return value;
+				}
+			}
+		}
+
         public AtomicBoolean(bool b)
         {
             value = b;

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IConnection.cs Fri Oct 27 10:24:35 2006
@@ -93,5 +93,11 @@
                 /// An asynchronous listener which can be notified if an error occurs
                 /// </summary>
                 event ExceptionListener ExceptionListener;
+
+                /// <summary>
+                /// Closes the connection.
+                /// </summary>
+                void Close();
+
         }
 }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/IMessageConsumer.cs Fri Oct 27 10:24:35 2006
@@ -46,6 +46,16 @@
         /// An asynchronous listener which can be used to consume messages asynchronously
         /// </summary>
         event MessageListener Listener;
+
+        /// <summary>
+        /// Closes the message consumer. 
+        /// </summary>
+        /// <remarks>
+        /// Clients should close message consumers them when they are not needed.
+        /// This call blocks until a receive or message listener in progress has completed.
+        /// A blocked message consumer receive call returns null when this message consumer is closed.
+        /// </remarks>
+        void Close();
     }
 }
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs?view=diff&rev=468465&r1=468464&r2=468465
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs Fri Oct 27 10:24:35 2006
@@ -101,6 +101,11 @@
                 /// </summary>
                 IBytesMessage CreateBytesMessage(byte[] body);
 
+                /// <summary>
+                /// Closes the session.  There is no need to close the producers and consumers
+                /// of a closed session.
+                /// </summary>
+                void Close();
 
                 // Transaction methods
 
@@ -115,6 +120,5 @@
                 /// send and acknowledgements for producers and consumers in this session
                 /// </summary>
                 void Rollback();
-
         }
 }