You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/18 23:36:12 UTC

svn commit: r1552134 - in /activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp: Connection.cs MessageConsumer.cs MessageProducer.cs Session.cs SessionClosedException.cs

Author: tabish
Date: Wed Dec 18 22:36:11 2013
New Revision: 1552134

URL: http://svn.apache.org/r1552134
Log:
https://issues.apache.org/jira/browse/AMQNET-454

Apply patch https://issues.apache.org/jira/secure/attachment/12619423/Apache.NMS.AMQP-qpid-object-lifecycle-02.patch

Added:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Wed Dec 18 22:36:11 2013
@@ -50,7 +50,7 @@ namespace Apache.NMS.Amqp
         private int sessionCounter = 0; 
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
 
-        Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
+        private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
 
         /// <summary>
         /// Creates new connection
@@ -81,7 +81,7 @@ namespace Apache.NMS.Amqp
                 {
                     foreach (Session session in sessions)
                     {
-                        //session.Start();
+                        session.Start();
                     }
                 }
             }
@@ -336,7 +336,44 @@ namespace Apache.NMS.Amqp
 
         public void Close()
         {
-            Dispose();
+            if (!this.closed.Value)
+            {
+                this.Stop();
+            }
+
+            lock (connectedLock)
+            {
+                if (this.closed.Value)
+                {
+                    return;
+                }
+
+                try
+                {
+                    Tracer.InfoFormat("Connection[]: Closing Connection Now.");
+                    this.closing.Value = true;
+
+                    lock (sessions.SyncRoot)
+                    {
+                        foreach (Session session in sessions)
+                        {
+                            session.Shutdown();
+                        }
+                    }
+                    sessions.Clear();
+
+                }
+                catch (Exception ex)
+                {
+                    Tracer.ErrorFormat("Connection[]: Error during connection close: {0}", ex);
+                }
+                finally
+                {
+                    this.closed.Value = true;
+                    this.connected.Value = false;
+                    this.closing.Value = false;
+                }
+            }
         }
 
         public void PurgeTempDestinations()
@@ -359,5 +396,15 @@ namespace Apache.NMS.Amqp
         {
             return Interlocked.Increment(ref sessionCounter);
         }
+
+        public Org.Apache.Qpid.Messaging.Session CreateQpidSession()
+        {
+            // TODO: Session name; transactional session
+            if (!connected.Value)
+            {
+                throw new ConnectionClosedException();
+            }
+            return qpidConnection.CreateSession();
+        }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Wed Dec 18 22:36:11 2013
@@ -1,6 +1,3 @@
-using System;
-using Org.Apache.Qpid.Messaging;
-using System.Threading;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,7 +14,11 @@ using System.Threading;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
+using System.Threading;
 using Apache.NMS.Util;
+using Org.Apache.Qpid.Messaging;
 
 namespace Apache.NMS.Amqp
 {
@@ -26,6 +27,11 @@ namespace Apache.NMS.Amqp
     /// </summary>
     public class MessageConsumer : IMessageConsumer
     {
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
+
         protected TimeSpan zeroTimeout = new TimeSpan(0);
 
         private readonly Session session;
@@ -38,6 +44,8 @@ namespace Apache.NMS.Amqp
         private AutoResetEvent pause = new AutoResetEvent(false);
         private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
 
+        private readonly Atomic<bool> started = new Atomic<bool>(false); 
+        private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null;
 
         private ConsumerTransformerDelegate consumerTransformer;
         public ConsumerTransformerDelegate ConsumerTransformer
@@ -54,6 +62,28 @@ namespace Apache.NMS.Amqp
             this.acknowledgementMode = acknowledgementMode;
         }
 
+        public void Start()
+        {
+            // Don't try creating session if connection not yet up
+            if (!session.IsStarted)
+            {
+                throw new SessionClosedException();
+            }
+
+            if (started.CompareAndSet(false, true))
+            {
+                try
+                {
+                    // Create qpid sender
+                    qpidReceiver = session.CreateQpidReceiver("");
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new NMSException("Failed to create Qpid Receiver : " + e.Message);
+                }
+            }
+        }
+
         public event MessageListener Listener
         {
             add

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Wed Dec 18 22:36:11 2013
@@ -14,7 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System;
+using System.Threading;
+using Apache.NMS.Util;
 using Org.Apache.Qpid.Messaging;
 
 namespace Apache.NMS.Amqp
@@ -24,8 +27,13 @@ namespace Apache.NMS.Amqp
     /// </summary>
     public class MessageProducer : IMessageProducer
     {
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
 
         private readonly Session session;
+        private readonly int id; 
         private Destination destination;
 
         //private long messageCounter;
@@ -34,10 +42,12 @@ namespace Apache.NMS.Amqp
         private MsgPriority priority;
         private bool disableMessageID;
         private bool disableMessageTimestamp;
-        private readonly int id;
 
         //private IMessageConverter messageConverter;
 
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+        private Org.Apache.Qpid.Messaging.Sender qpidSender = null;
+
         private ProducerTransformerDelegate producerTransformer;
         public ProducerTransformerDelegate ProducerTransformer
         {
@@ -52,6 +62,28 @@ namespace Apache.NMS.Amqp
             this.destination = destination;
         }
 
+        public void Start()
+        {
+            // Don't try creating session if connection not yet up
+            if (!session.IsStarted)
+            {
+                throw new SessionClosedException();
+            }
+
+            if (started.CompareAndSet(false, true))
+            {
+                try
+                {
+                    // Create qpid sender
+                    qpidSender = session.CreateQpidSender("");
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new NMSException("Failed to create Qpid Sender : " + e.Message);
+                }
+            }
+        }
+
         public void Send(IMessage message)
         {
             Send(Destination, message);

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Wed Dec 18 22:36:11 2013
@@ -17,6 +17,7 @@
 using System;
 using System.Collections;
 using System.Threading;
+using Apache.NMS.Util;
 using Org.Apache.Qpid.Messaging;
 
 namespace Apache.NMS.Amqp
@@ -43,6 +44,8 @@ namespace Apache.NMS.Amqp
         private int producerCounter;
         private long nextDeliveryId;
         private long lastDeliveredSequenceId;
+        private readonly object sessionLock = new object();
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
         protected bool disposed = false;
         protected bool closed = false;
         protected bool closing = false;
@@ -50,6 +53,8 @@ namespace Apache.NMS.Amqp
         private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
         private TimeSpan requestTimeout;
 
+        private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start()
+
         public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
         {
             this.connection = connection;
@@ -61,6 +66,58 @@ namespace Apache.NMS.Amqp
                 // TODO: transactions
                 throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
             }
+            if (connection.IsStarted)
+            {
+                this.Start();
+            }
+            connection.AddSession(this);
+        }
+
+        /// <summary>
+        /// Create new unmanaged session and start senders and receivers
+        /// Associated connection must be open.
+        /// </summary>
+        public void Start()
+        {
+            // Don't try creating session if connection not yet up
+            if (!connection.IsStarted)
+            {
+                throw new ConnectionClosedException();
+            }
+
+            if (started.CompareAndSet(false, true))
+            {
+                try
+                {
+                    // Create qpid session
+                    qpidSession = connection.CreateQpidSession();
+
+                    // Start producers and consumers
+                    lock (producers.SyncRoot)
+                    {
+                        foreach (MessageProducer producer in producers.Values)
+                        {
+                            producer.Start();
+                        }
+                    }
+                    lock (consumers.SyncRoot)
+                    {
+                        foreach (MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.Start();
+                        }
+                    }
+                }
+                catch (Org.Apache.Qpid.Messaging.QpidException e)
+                {
+                    throw new SessionClosedException( "Failed to create session : " + e.Message );
+                }
+            }
+        }
+
+        public bool IsStarted
+        {
+            get { return started.Value; }
         }
 
         public void Dispose()
@@ -136,7 +193,7 @@ namespace Apache.NMS.Amqp
                     {
                         foreach (MessageConsumer consumer in consumers.Values)
                         {
-                            consumer.Shutdown();
+                            consumer.Close();
                         }
                     }
                     consumers.Clear();
@@ -145,7 +202,7 @@ namespace Apache.NMS.Amqp
                     {
                         foreach (MessageProducer producer in producers.Values)
                         {
-                            producer.Shutdown();
+                            producer.Close();
                         }
                     }
                     producers.Clear();
@@ -463,7 +520,26 @@ namespace Apache.NMS.Amqp
         {
             get { return id; }
         }
-        
+
+
+        public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(string address)
+        {
+            if (!IsStarted)
+            {
+                throw new SessionClosedException();
+            }
+            return qpidSession.CreateReceiver(address);
+        }
+
+        public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(string address)
+        {
+            if (!IsStarted)
+            {
+                throw new SessionClosedException();
+            }
+            return qpidSession.CreateSender(address);
+        }
+
         #region Transaction State Events
 
         public event SessionTxEventDelegate TransactionStartedListener;

Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs?rev=1552134&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs Wed Dec 18 22:36:11 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Amqp
+{
+	/// <summary>
+	/// Exception thrown when a session is used that it already closed
+	/// </summary>
+	[Serializable]
+	public class SessionClosedException : NMSException
+	{
+		public SessionClosedException()
+			: base("The session is already closed!")
+		{
+		}
+
+		public SessionClosedException(string message)
+			: base(message)
+		{
+		}
+
+		public SessionClosedException(string message, string errorCode)
+			: base(message, errorCode)
+		{
+		}
+
+		public SessionClosedException(string message, Exception innerException)
+			: base(message, innerException)
+		{
+		}
+
+		public SessionClosedException(string message, string errorCode, Exception innerException)
+			: base(message, errorCode, innerException)
+		{
+		}
+
+		#region ISerializable interface implementation
+
+		/// <summary>
+		/// Initializes a new instance of the SessionClosedException class with serialized data.
+		/// Throws System.ArgumentNullException if the info parameter is null.
+		/// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+		/// </summary>
+		/// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+		/// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+		protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+			: base(info, context)
+		{
+		}
+
+		#endregion
+	}
+}
\ No newline at end of file

Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs
------------------------------------------------------------------------------
    svn:eol-style = native