You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/18 23:36:12 UTC
svn commit: r1552134 - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp:
Connection.cs MessageConsumer.cs MessageProducer.cs Session.cs
SessionClosedException.cs
Author: tabish
Date: Wed Dec 18 22:36:11 2013
New Revision: 1552134
URL: http://svn.apache.org/r1552134
Log:
https://issues.apache.org/jira/browse/AMQNET-454
Apply patch https://issues.apache.org/jira/secure/attachment/12619423/Apache.NMS.AMQP-qpid-object-lifecycle-02.patch
Added:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Connection.cs Wed Dec 18 22:36:11 2013
@@ -50,7 +50,7 @@ namespace Apache.NMS.Amqp
private int sessionCounter = 0;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
+ private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
/// <summary>
/// Creates new connection
@@ -81,7 +81,7 @@ namespace Apache.NMS.Amqp
{
foreach (Session session in sessions)
{
- //session.Start();
+ session.Start();
}
}
}
@@ -336,7 +336,44 @@ namespace Apache.NMS.Amqp
public void Close()
{
- Dispose();
+ if (!this.closed.Value)
+ {
+ this.Stop();
+ }
+
+ lock (connectedLock)
+ {
+ if (this.closed.Value)
+ {
+ return;
+ }
+
+ try
+ {
+ Tracer.InfoFormat("Connection[]: Closing Connection Now.");
+ this.closing.Value = true;
+
+ lock (sessions.SyncRoot)
+ {
+ foreach (Session session in sessions)
+ {
+ session.Shutdown();
+ }
+ }
+ sessions.Clear();
+
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Connection[]: Error during connection close: {0}", ex);
+ }
+ finally
+ {
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
+ }
+ }
}
public void PurgeTempDestinations()
@@ -359,5 +396,15 @@ namespace Apache.NMS.Amqp
{
return Interlocked.Increment(ref sessionCounter);
}
+
+ public Org.Apache.Qpid.Messaging.Session CreateQpidSession()
+ {
+ // TODO: Session name; transactional session
+ if (!connected.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ return qpidConnection.CreateSession();
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Wed Dec 18 22:36:11 2013
@@ -1,6 +1,3 @@
-using System;
-using Org.Apache.Qpid.Messaging;
-using System.Threading;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,7 +14,11 @@ using System.Threading;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
+using System.Threading;
using Apache.NMS.Util;
+using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
@@ -26,6 +27,11 @@ namespace Apache.NMS.Amqp
/// </summary>
public class MessageConsumer : IMessageConsumer
{
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
+
protected TimeSpan zeroTimeout = new TimeSpan(0);
private readonly Session session;
@@ -38,6 +44,8 @@ namespace Apache.NMS.Amqp
private AutoResetEvent pause = new AutoResetEvent(false);
private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -54,6 +62,28 @@ namespace Apache.NMS.Amqp
this.acknowledgementMode = acknowledgementMode;
}
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!session.IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid sender
+ qpidReceiver = session.CreateQpidReceiver("");
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to create Qpid Receiver : " + e.Message);
+ }
+ }
+ }
+
public event MessageListener Listener
{
add
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Wed Dec 18 22:36:11 2013
@@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
+using System.Threading;
+using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
@@ -24,8 +27,13 @@ namespace Apache.NMS.Amqp
/// </summary>
public class MessageProducer : IMessageProducer
{
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
private readonly Session session;
+ private readonly int id;
private Destination destination;
//private long messageCounter;
@@ -34,10 +42,12 @@ namespace Apache.NMS.Amqp
private MsgPriority priority;
private bool disableMessageID;
private bool disableMessageTimestamp;
- private readonly int id;
//private IMessageConverter messageConverter;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private Org.Apache.Qpid.Messaging.Sender qpidSender = null;
+
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
{
@@ -52,6 +62,28 @@ namespace Apache.NMS.Amqp
this.destination = destination;
}
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!session.IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid sender
+ qpidSender = session.CreateQpidSender("");
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to create Qpid Sender : " + e.Message);
+ }
+ }
+ }
+
public void Send(IMessage message)
{
Send(Destination, message);
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs?rev=1552134&r1=1552133&r2=1552134&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/Session.cs Wed Dec 18 22:36:11 2013
@@ -17,6 +17,7 @@
using System;
using System.Collections;
using System.Threading;
+using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
@@ -43,6 +44,8 @@ namespace Apache.NMS.Amqp
private int producerCounter;
private long nextDeliveryId;
private long lastDeliveredSequenceId;
+ private readonly object sessionLock = new object();
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
protected bool disposed = false;
protected bool closed = false;
protected bool closing = false;
@@ -50,6 +53,8 @@ namespace Apache.NMS.Amqp
private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TimeSpan requestTimeout;
+ private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start()
+
public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
@@ -61,6 +66,58 @@ namespace Apache.NMS.Amqp
// TODO: transactions
throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
}
+ if (connection.IsStarted)
+ {
+ this.Start();
+ }
+ connection.AddSession(this);
+ }
+
+ /// <summary>
+ /// Create new unmanaged session and start senders and receivers
+ /// Associated connection must be open.
+ /// </summary>
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!connection.IsStarted)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid session
+ qpidSession = connection.CreateQpidSession();
+
+ // Start producers and consumers
+ lock (producers.SyncRoot)
+ {
+ foreach (MessageProducer producer in producers.Values)
+ {
+ producer.Start();
+ }
+ }
+ lock (consumers.SyncRoot)
+ {
+ foreach (MessageConsumer consumer in consumers.Values)
+ {
+ consumer.Start();
+ }
+ }
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new SessionClosedException( "Failed to create session : " + e.Message );
+ }
+ }
+ }
+
+ public bool IsStarted
+ {
+ get { return started.Value; }
}
public void Dispose()
@@ -136,7 +193,7 @@ namespace Apache.NMS.Amqp
{
foreach (MessageConsumer consumer in consumers.Values)
{
- consumer.Shutdown();
+ consumer.Close();
}
}
consumers.Clear();
@@ -145,7 +202,7 @@ namespace Apache.NMS.Amqp
{
foreach (MessageProducer producer in producers.Values)
{
- producer.Shutdown();
+ producer.Close();
}
}
producers.Clear();
@@ -463,7 +520,26 @@ namespace Apache.NMS.Amqp
{
get { return id; }
}
-
+
+
+ public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(string address)
+ {
+ if (!IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+ return qpidSession.CreateReceiver(address);
+ }
+
+ public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(string address)
+ {
+ if (!IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+ return qpidSession.CreateSender(address);
+ }
+
#region Transaction State Events
public event SessionTxEventDelegate TransactionStartedListener;
Added: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs?rev=1552134&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs Wed Dec 18 22:36:11 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// Exception thrown when a session is used that it already closed
+ /// </summary>
+ [Serializable]
+ public class SessionClosedException : NMSException
+ {
+ public SessionClosedException()
+ : base("The session is already closed!")
+ {
+ }
+
+ public SessionClosedException(string message)
+ : base(message)
+ {
+ }
+
+ public SessionClosedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public SessionClosedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public SessionClosedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the SessionClosedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
\ No newline at end of file
Propchange: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/SessionClosedException.cs
------------------------------------------------------------------------------
svn:eol-style = native