You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/20 23:32:53 UTC
svn commit: r1552832 - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp:
Connection.cs Destination.cs MessageConsumer.cs MessageProducer.cs Session.cs
Author: tabish
Date: Fri Dec 20 22:32:52 2013
New Revision: 1552832
URL: http://svn.apache.org/r1552832
Log:
https://issues.apache.org/jira/browse/AMQNET-454
apply patch: https://issues.apache.org/jira/secure/attachment/12619923/Apache.NMS.AMQP-object-lifecycle-04.patch
Modified:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Fri Dec 20 22:32:52 2013
@@ -69,18 +69,22 @@ namespace Apache.NMS.Amqp
Dispose(false);
}
+ #region IStartable Members
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
{
+ // Create and open qpidConnection
CheckConnected();
+
if (started.CompareAndSet(false, true))
{
lock (sessions.SyncRoot)
{
foreach (Session session in sessions)
{
+ // Create and start qpidSessions
session.Start();
}
}
@@ -95,25 +99,38 @@ namespace Apache.NMS.Amqp
{
get { return started.Value; }
}
+ #endregion
+ #region IStoppable Members
/// <summary>
/// Temporarily stop asynchronous delivery of inbound messages for this connection.
/// The sending of outbound messages is unaffected.
/// </summary>
public void Stop()
{
+ // Close qpidConnection
+ CheckDisconnected();
+
+ // Administratively close NMS objects
if (started.CompareAndSet(true, false))
{
- lock (sessions.SyncRoot)
+ foreach (Session session in sessions)
{
- foreach (Session session in sessions)
- {
- //session.Stop();
- }
+ // Create and start qpidSessions
+ session.Stop();
}
}
}
+ #endregion
+ #region IDisposable Methods
+ public void Dispose()
+ {
+ Dispose(true);
+ }
+ #endregion
+
+ #region AMQP Connection Class Methods
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
@@ -147,12 +164,6 @@ namespace Apache.NMS.Amqp
}
}
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
protected void Dispose(bool disposing)
{
if (disposed)
@@ -298,11 +309,17 @@ namespace Apache.NMS.Amqp
try
{
// TODO: embellish the brokerUri with other connection options
- // Allocate a new Qpid connection
- qpidConnection = new Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString());
+ // Allocate a Qpid connection
+ if (qpidConnection == null)
+ {
+ qpidConnection = new Org.Apache.Qpid.Messaging.Connection(brokerUri.ToString());
+ }
// Open the connection
- qpidConnection.Open();
+ if (!qpidConnection.IsOpen)
+ {
+ qpidConnection.Open();
+ }
connected.Value = true;
}
@@ -334,6 +351,54 @@ namespace Apache.NMS.Amqp
}
}
+
+ /// <summary>
+ /// Check and ensure that the connection object is disconnected
+ /// Open connections are closed and this closes related sessions, senders, and receivers.
+ /// Closed connections may be restarted with subsequent calls to Start().
+ /// </summary>
+ internal void CheckDisconnected()
+ {
+ if (closed.Value || closing.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ if (!connected.Value)
+ {
+ return;
+ }
+ while (connected.Value && !closed.Value && !closing.Value)
+ {
+ if (Monitor.TryEnter(connectedLock))
+ {
+ try
+ {
+ // Close the connection
+ if (qpidConnection.IsOpen)
+ {
+ qpidConnection.Close();
+ }
+
+ connected.Value = false;
+ break;
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("AMQP Connection close failed : " + e.Message);
+ }
+ finally
+ {
+ Monitor.Exit(connectedLock);
+ }
+ }
+ }
+
+ if (connected.Value)
+ {
+ throw new NMSException("Failed to close AMQP Connection");
+ }
+ }
+
public void Close()
{
if (!this.closed.Value)
@@ -392,6 +457,7 @@ namespace Apache.NMS.Amqp
}
}
+
public int GetNextSessionId()
{
return Interlocked.Increment(ref sessionCounter);
@@ -406,5 +472,6 @@ namespace Apache.NMS.Amqp
}
return qpidConnection.CreateSession();
}
+ #endregion
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Destination.cs Fri Dec 20 22:32:52 2013
@@ -49,12 +49,12 @@ namespace Apache.NMS.Amqp
set
{
this.path = value;
- if(!this.path.Contains("\\"))
- {
- // Queues must have paths in them. If no path specified, then
- // default to local machine.
- this.path = ".\\" + this.path;
- }
+ //if(!this.path.Contains("\\"))
+ //{
+ // // Queues must have paths in them. If no path specified, then
+ // // default to local machine.
+ // this.path = ".\\" + this.path;
+ //}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Fri Dec 20 22:32:52 2013
@@ -62,6 +62,7 @@ namespace Apache.NMS.Amqp
this.acknowledgementMode = acknowledgementMode;
}
+ #region IStartable Methods
public void Start()
{
// Don't try creating session if connection not yet up
@@ -75,7 +76,11 @@ namespace Apache.NMS.Amqp
try
{
// Create qpid sender
- qpidReceiver = session.CreateQpidReceiver("");
+ Console.WriteLine("Start Consumer Id = " + ConsumerId.ToString());
+ if (qpidReceiver == null)
+ {
+ qpidReceiver = session.CreateQpidReceiver(destination.ToString());
+ }
}
catch (Org.Apache.Qpid.Messaging.QpidException e)
{
@@ -84,6 +89,30 @@ namespace Apache.NMS.Amqp
}
}
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+ #endregion
+
+ #region IStoppable Methods
+ public void Stop()
+ {
+ if (started.CompareAndSet(true, false))
+ {
+ try
+ {
+ qpidReceiver.Dispose();
+ qpidReceiver = null;
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to close session with Id " + ConsumerId.ToString() + " : " + e.Message);
+ }
+ }
+ }
+ #endregion
+
public event MessageListener Listener
{
add
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Fri Dec 20 22:32:52 2013
@@ -62,6 +62,7 @@ namespace Apache.NMS.Amqp
this.destination = destination;
}
+ #region IStartable Methods
public void Start()
{
// Don't try creating session if connection not yet up
@@ -75,7 +76,11 @@ namespace Apache.NMS.Amqp
try
{
// Create qpid sender
- qpidSender = session.CreateQpidSender("");
+ Console.WriteLine("Start Producer Id = " + ProducerId.ToString());
+ if (qpidSender == null)
+ {
+ qpidSender = session.CreateQpidSender(destination.ToString());
+ }
}
catch (Org.Apache.Qpid.Messaging.QpidException e)
{
@@ -84,6 +89,31 @@ namespace Apache.NMS.Amqp
}
}
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+ #endregion
+
+ #region IStoppable Methods
+ public void Stop()
+ {
+ if (started.CompareAndSet(true, false))
+ {
+ try
+ {
+ Console.WriteLine("Stop Producer Id = " + ProducerId);
+ qpidSender.Dispose();
+ qpidSender = null;
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to close session with Id " + ProducerId.ToString() + " : " + e.Message);
+ }
+ }
+ }
+ #endregion
+
public void Send(IMessage message)
{
Send(Destination, message);
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1552832&r1=1552831&r2=1552832&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Fri Dec 20 22:32:52 2013
@@ -25,7 +25,7 @@ namespace Apache.NMS.Amqp
/// <summary>
/// Amqp provider of ISession
/// </summary>
- public class Session : ISession
+ public class Session : ISession, IStartable, IStoppable
{
/// <summary>
/// Private object used for synchronization, instead of public "this"
@@ -73,6 +73,7 @@ namespace Apache.NMS.Amqp
connection.AddSession(this);
}
+ #region IStartable Methods
/// <summary>
/// Create new unmanaged session and start senders and receivers
/// Associated connection must be open.
@@ -90,7 +91,10 @@ namespace Apache.NMS.Amqp
try
{
// Create qpid session
- qpidSession = connection.CreateQpidSession();
+ if (qpidSession == null)
+ {
+ qpidSession = connection.CreateQpidSession();
+ }
// Start producers and consumers
lock (producers.SyncRoot)
@@ -119,12 +123,48 @@ namespace Apache.NMS.Amqp
{
get { return started.Value; }
}
+ #endregion
+
+ #region IStoppable Methods
+ public void Stop()
+ {
+ if (started.CompareAndSet(true, false))
+ {
+ try
+ {
+ lock (producers.SyncRoot)
+ {
+ foreach (MessageProducer producer in producers.Values)
+ {
+ producer.Stop();
+ }
+ }
+ lock (consumers.SyncRoot)
+ {
+ foreach (MessageConsumer consumer in consumers.Values)
+ {
+ consumer.Stop();
+ }
+ }
+
+ qpidSession.Dispose();
+ qpidSession = null;
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to close session with Id " + SessionId.ToString() + " : " + e.Message);
+ }
+ }
+ }
+ #endregion
+ #region IDisposable Methods
public void Dispose()
{
Dispose(true);
- GC.SuppressFinalize(this);
}
+ #endregion
+
protected void Dispose(bool disposing)
{
@@ -228,15 +268,15 @@ namespace Apache.NMS.Amqp
public IMessageProducer CreateProducer(IDestination destination)
{
+ if (destination == null)
+ {
+ throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+ }
MessageProducer producer = null;
try
{
- Destination dest = null;
- if (destination != null)
- {
- dest.Path = destination.ToString();
- }
- producer = DoCreateMessageProducer(dest);
+ Queue queue = new Queue(destination.ToString());
+ producer = DoCreateMessageProducer(queue);
this.AddProducer(producer);
}
@@ -280,12 +320,8 @@ namespace Apache.NMS.Amqp
try
{
- Destination dest = null;
- if (destination != null)
- {
- dest.Path = destination.ToString();
- }
- consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, acknowledgementMode);
+ Queue queue = new Queue(destination.ToString());
+ consumer = DoCreateMessageConsumer(GetNextConsumerId(), queue, acknowledgementMode);
consumer.ConsumerTransformer = this.ConsumerTransformer;