You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/20 23:32:53 UTC

svn commit: r1552832 - in /activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp: Connection.cs Destination.cs MessageConsumer.cs MessageProducer.cs Session.cs

Author: tabish
Date: Fri Dec 20 22:32:52 2013
New Revision: 1552832

URL: http://svn.apache.org/r1552832
Log:
https://issues.apache.org/jira/browse/AMQNET-454

apply patch: https://issues.apache.org/jira/secure/attachment/12619923/Apache.NMS.AMQP-object-lifecycle-04.patch

Modified:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Fri Dec 20 22:32:52 2013
@@ -69,18 +69,22 @@ namespace Apache.NMS.Amqp
             Dispose(false);
         }
 
+        #region IStartable Members
         /// <summary>
         /// Starts message delivery for this connection.
         /// </summary>
         public void Start()
         {
+            // Create and open qpidConnection
             CheckConnected();
+
             if (started.CompareAndSet(false, true))
             {
                 lock (sessions.SyncRoot)
                 {
                     foreach (Session session in sessions)
                     {
+                        // Create and start qpidSessions
                         session.Start();
                     }
                 }
@@ -95,25 +99,38 @@ namespace Apache.NMS.Amqp
         {
             get { return started.Value; }
         }
+        #endregion
 
+        #region IStoppable Members
         /// <summary>
         /// Temporarily stop asynchronous delivery of inbound messages for this connection.
         /// The sending of outbound messages is unaffected.
         /// </summary>
         public void Stop()
         {
+            // Close qpidConnection
+            CheckDisconnected();
+
+            // Administratively close NMS objects
             if (started.CompareAndSet(true, false))
             {
-                lock (sessions.SyncRoot)
+                foreach (Session session in sessions)
                 {
-                    foreach (Session session in sessions)
-                    {
-                        //session.Stop();
-                    }
+                    // Create and start qpidSessions
+                    session.Stop();
                 }
             }
         }
+        #endregion
 
+        #region IDisposable Methods
+        public void Dispose()
+        {
+            Dispose(true);
+        }
+        #endregion
+
+        #region AMQP Connection Class Methods
         /// <summary>
         /// Creates a new session to work on this connection
         /// </summary>
@@ -147,12 +164,6 @@ namespace Apache.NMS.Amqp
             }
         }
 
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
         protected void Dispose(bool disposing)
         {
             if (disposed)
@@ -298,11 +309,17 @@ namespace Apache.NMS.Amqp
                         try
                         {
                             // TODO: embellish the brokerUri with other connection options
-                            // Allocate a new Qpid connection
-                            qpidConnection = new Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString());
+                            // Allocate a Qpid connection
+                            if (qpidConnection == null)
+                            {
+                                qpidConnection = new Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString());
+                            }
                             
                             // Open the connection
-                            qpidConnection.Open();
+                            if (!qpidConnection.IsOpen)
+                            {
+                                qpidConnection.Open();
+                            }
 
                             connected.Value = true;
                         }
@@ -334,6 +351,54 @@ namespace Apache.NMS.Amqp
             }
         }
 
+
+        /// <summary>
+        /// Check and ensure that the connection object is disconnected
+        /// Open connections are closed and this closes related sessions, senders, and receivers.
+        /// Closed connections may be restarted with subsequent calls to Start().
+        /// </summary>
+        internal void CheckDisconnected()
+        {
+            if (closed.Value || closing.Value)
+            {
+                throw new ConnectionClosedException();
+            }
+            if (!connected.Value)
+            {
+                return;
+            }
+            while (connected.Value && !closed.Value && !closing.Value)
+            {
+                if (Monitor.TryEnter(connectedLock))
+                {
+                    try
+                    {
+                        // Close the connection
+                        if (qpidConnection.IsOpen)
+                        {
+                            qpidConnection.Close();
+                        }
+
+                        connected.Value = false;
+                        break;
+                    }
+                    catch (Org.Apache.Qpid.Messaging.QpidException e)
+                    {
+                        throw new NMSException("AMQP Connection close failed : " + e.Message);
+                    }
+                    finally
+                    {
+                        Monitor.Exit(connectedLock);
+                    }
+                }
+            }
+
+            if (connected.Value)
+            {
+                throw new NMSException("Failed to close AMQP Connection");
+            }
+        }
+
         public void Close()
         {
             if (!this.closed.Value)
@@ -392,6 +457,7 @@ namespace Apache.NMS.Amqp
             }
         }
 
+
         public int GetNextSessionId()
         {
             return Interlocked.Increment(ref sessionCounter);
@@ -406,5 +472,6 @@ namespace Apache.NMS.Amqp
             }
             return qpidConnection.CreateSession();
         }
+        #endregion
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs Fri Dec 20 22:32:52 2013
@@ -49,12 +49,12 @@ namespace Apache.NMS.Amqp
 			set
 			{
 				this.path = value;
-				if(!this.path.Contains("\\"))
-				{
-					// Queues must have paths in them.  If no path specified, then
-					// default to local machine.
-					this.path = ".\\" + this.path;
-				}
+                //if(!this.path.Contains("\\"))
+                //{
+                //    // Queues must have paths in them.  If no path specified, then
+                //    // default to local machine.
+                //    this.path = ".\\" + this.path;
+                //}
 			}
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Fri Dec 20 22:32:52 2013
@@ -62,6 +62,7 @@ namespace Apache.NMS.Amqp
             this.acknowledgementMode = acknowledgementMode;
         }
 
+        #region IStartable Methods
         public void Start()
         {
             // Don't try creating session if connection not yet up
@@ -75,7 +76,11 @@ namespace Apache.NMS.Amqp
                 try
                 {
                     // Create qpid sender
-                    qpidReceiver = session.CreateQpidReceiver("");
+                    Console.WriteLine("Start Consumer Id = " + ConsumerId.ToString());
+                    if (qpidReceiver == null)
+                    {
+                        qpidReceiver = session.CreateQpidReceiver(destination.ToString());
+                    }
                 }
                 catch (Org.Apache.Qpid.Messaging.QpidException e)
                 {
@@ -84,6 +89,30 @@ namespace Apache.NMS.Amqp
             }
         }
 
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+        #endregion
+
+        #region IStoppable Methods
+        public void Stop()
+        {
+            if (started.CompareAndSet(true, false))
+            {
+                try
+                {
+                    qpidReceiver.Dispose();
+                    qpidReceiver = null;
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new NMSException("Failed to close session with Id " + ConsumerId.ToString() + " : " + e.Message);
+                }
+            }
+        }
+        #endregion
+
         public event MessageListener Listener
         {
             add

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Fri Dec 20 22:32:52 2013
@@ -62,6 +62,7 @@ namespace Apache.NMS.Amqp
             this.destination = destination;
         }
 
+        #region IStartable Methods
         public void Start()
         {
             // Don't try creating session if connection not yet up
@@ -75,7 +76,11 @@ namespace Apache.NMS.Amqp
                 try
                 {
                     // Create qpid sender
-                    qpidSender = session.CreateQpidSender("");
+                    Console.WriteLine("Start Producer Id = " + ProducerId.ToString()); 
+                    if (qpidSender == null)
+                    {
+                        qpidSender = session.CreateQpidSender(destination.ToString());
+                    }
                 }
                 catch (Org.Apache.Qpid.Messaging.QpidException e)
                 {
@@ -84,6 +89,31 @@ namespace Apache.NMS.Amqp
             }
         }
 
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+        #endregion
+
+        #region IStoppable Methods
+        public void Stop()
+        {
+            if (started.CompareAndSet(true, false))
+            {
+                try
+                {
+                    Console.WriteLine("Stop  Producer Id = " + ProducerId);
+                    qpidSender.Dispose();
+                    qpidSender = null;
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new NMSException("Failed to close session with Id " + ProducerId.ToString() + " : " + e.Message);
+                }
+            }
+        }
+        #endregion
+
         public void Send(IMessage message)
         {
             Send(Destination, message);

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Fri Dec 20 22:32:52 2013
@@ -25,7 +25,7 @@ namespace Apache.NMS.Amqp
     /// <summary>
     /// Amqp provider of ISession
     /// </summary>
-    public class Session : ISession
+    public class Session : ISession, IStartable, IStoppable
     {
         /// <summary>
         /// Private object used for synchronization, instead of public "this"
@@ -73,6 +73,7 @@ namespace Apache.NMS.Amqp
             connection.AddSession(this);
         }
 
+        #region IStartable Methods
         /// <summary>
         /// Create new unmanaged session and start senders and receivers
         /// Associated connection must be open.
@@ -90,7 +91,10 @@ namespace Apache.NMS.Amqp
                 try
                 {
                     // Create qpid session
-                    qpidSession = connection.CreateQpidSession();
+                    if (qpidSession == null)
+                    {
+                        qpidSession = connection.CreateQpidSession();
+                    }
 
                     // Start producers and consumers
                     lock (producers.SyncRoot)
@@ -119,12 +123,48 @@ namespace Apache.NMS.Amqp
         {
             get { return started.Value; }
         }
+        #endregion
+
+        #region IStoppable Methods
+        public void Stop()
+        {
+            if (started.CompareAndSet(true, false))
+            {
+                try
+                {
+                    lock (producers.SyncRoot)
+                    {
+                        foreach (MessageProducer producer in producers.Values)
+                        {
+                            producer.Stop();
+                        }
+                    }
+                    lock (consumers.SyncRoot)
+                    {
+                        foreach (MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.Stop();
+                        }
+                    }
+
+                    qpidSession.Dispose();
+                    qpidSession = null;
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new NMSException("Failed to close session with Id " + SessionId.ToString() + " : " + e.Message);
+                }
+            }
+        }
+        #endregion
 
+        #region IDisposable Methods
         public void Dispose()
         {
             Dispose(true);
-            GC.SuppressFinalize(this);
         }
+        #endregion
+
 
         protected void Dispose(bool disposing)
         {
@@ -228,15 +268,15 @@ namespace Apache.NMS.Amqp
 
         public IMessageProducer CreateProducer(IDestination destination)
         {
+            if (destination == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+            }
             MessageProducer producer = null;
             try
             {
-                Destination dest = null;
-                if (destination != null)
-                {
-                    dest.Path = destination.ToString();
-                }
-                producer = DoCreateMessageProducer(dest);
+                Queue queue = new Queue(destination.ToString());
+                producer = DoCreateMessageProducer(queue);
 
                 this.AddProducer(producer);
             }
@@ -280,12 +320,8 @@ namespace Apache.NMS.Amqp
 
             try
             {
-                Destination dest = null;
-                if (destination != null)
-                {
-                    dest.Path = destination.ToString();
-                }
-                consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, acknowledgementMode);
+                Queue queue = new Queue(destination.ToString());
+                consumer = DoCreateMessageConsumer(GetNextConsumerId(), queue, acknowledgementMode);
 
                 consumer.ConsumerTransformer = this.ConsumerTransformer;