You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/13 00:00:44 UTC
svn commit: r627169 [2/3] - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/
src/main/csharp/OpenWire/V1/ src/main/csharp/OpenWire/V2/
src/main/csharp/Transport/ src/main/csharp/Transport/Stomp/
src/main/csharp/Transport/Tcp/...
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Feb 12 15:00:40 2008
@@ -16,6 +16,7 @@
*/
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Util;
using Apache.NMS;
using System;
using System.Collections;
@@ -27,31 +28,32 @@
/// </summary>
public class Connection : IConnection
{
- private ITransport transport;
- private ConnectionInfo info;
+ private readonly Uri brokerUri;
+ private ITransport transport;
+ private readonly ConnectionInfo info;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
- private IList sessions = new ArrayList();
- private IDictionary consumers = new Hashtable(); // TODO threadsafe
- private bool asyncSend;
- private bool connected;
- private bool closed;
- private long sessionCounter;
- private long temporaryDestinationCounter;
+ private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+ private bool asyncSend = false;
+ private bool connected = false;
+ private bool closed = false;
+ private long sessionCounter = 0;
+ private long temporaryDestinationCounter = 0;
private long localTransactionCounter;
- private bool closing;
- private Util.AtomicBoolean started = new ActiveMQ.Util.AtomicBoolean(true);
+ private bool closing = false;
+ private readonly AtomicBoolean started = new AtomicBoolean(true);
private bool disposed = false;
- public Connection(ITransport transport, ConnectionInfo info)
+ public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
{
- this.transport = transport;
- this.info = info;
- this.transport.Command = new CommandHandler(OnCommand);
- this.transport.Exception = new ExceptionHandler(OnException);
- this.transport.Start();
- }
+ this.brokerUri = connectionUri;
+ this.info = info;
+ this.transport = transport;
+ this.transport.Command = OnCommand;
+ this.transport.Exception = OnException;
+ this.transport.Start();
+ }
~Connection()
{
@@ -66,7 +68,8 @@
get { return started.Value; }
}
-
+ #region Properties
+
/// <summary>
/// This property indicates whether or not async send is enabled.
/// </summary>
@@ -75,8 +78,19 @@
get { return asyncSend; }
set { asyncSend = value; }
}
-
-
+
+ /// <summary>
+ /// This property sets the acknowledgment mode for the connection.
+ /// The URI parameter connection.ackmode can be set to a string value
+ /// that maps to the enumeration value.
+ /// </summary>
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ #endregion
+
/// <summary>
/// Starts asynchronous message delivery of incoming messages for this connection.
/// Synchronous delivery is unaffected.
@@ -120,25 +134,41 @@
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
- public ISession CreateSession(AcknowledgementMode acknowledgementMode)
+ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
{
- SessionInfo info = CreateSessionInfo(acknowledgementMode);
- SyncRequest(info);
- Session session = new Session(this, info, acknowledgementMode);
- sessions.Add(session);
+ SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
+ SyncRequest(info);
+ Session session = new Session(this, info, sessionAcknowledgementMode);
+
+ // Set properties on session using parameters prefixed with "session."
+ System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(this.brokerUri.Query);
+ URISupport.SetProperties(session, map, "session.");
+
+ sessions.Add(session);
return session;
}
+ public void RemoveSession(Session session)
+ {
+ DisposeOf(session.SessionId);
+
+ if(!closing)
+ {
+ sessions.Remove(session);
+ }
+ }
+
public void Close()
{
- if (!closed)
+ if(!closed)
{
closing = true;
- foreach (Session session in sessions)
+ foreach(Session session in sessions)
{
session.Close();
}
sessions.Clear();
+
try
{
DisposeOf(ConnectionId);
@@ -148,6 +178,7 @@
{
Tracer.ErrorFormat("Error during connection close: {0}", ex);
}
+
transport.Dispose();
transport = null;
closed = true;
@@ -188,23 +219,29 @@
}
// Properties
-
- public ITransport ITransport
+
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ }
+
+ public ITransport ITransport
{
get { return transport; }
set { this.transport = value; }
}
- public AcknowledgementMode AcknowledgementMode
- {
- get { return acknowledgementMode; }
- set { this.acknowledgementMode = value; }
- }
-
- public string ClientId
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ public string ClientId
{
get { return info.ClientId; }
- set {
+ set
+ {
if (connected)
{
throw new NMSException("You cannot change the ClientId once the Connection is connected");
@@ -215,32 +252,26 @@
public ConnectionId ConnectionId
{
- get {
- return info.ConnectionId;
- }
+ get { return info.ConnectionId; }
}
public BrokerInfo BrokerInfo
{
- get {
- return brokerInfo;
- }
+ get { return brokerInfo; }
}
public WireFormatInfo BrokerWireFormat
{
- get {
- return brokerWireFormatInfo;
- }
+ get { return brokerWireFormatInfo; }
}
// Implementation methods
-
- /// <summary>
+
+ /// <summary>
/// Performs a synchronous request-response with the broker
/// </summary>
public Response SyncRequest(Command command)
- {
+ {
CheckConnected();
Response response = transport.Request(command);
if (response is ExceptionResponse)
@@ -251,21 +282,23 @@
}
return response;
}
-
- public void OneWay(Command command)
- {
- CheckConnected();
- transport.Oneway(command);
- }
-
- public void DisposeOf(DataStructure objectId)
- {
- RemoveInfo command = new RemoveInfo();
- command.ObjectId = objectId;
- transport.Oneway(command);
- }
-
-
+
+ public void OneWay(Command command)
+ {
+ CheckConnected();
+ transport.Oneway(command);
+ }
+
+ public void DisposeOf(DataStructure objectId)
+ {
+ RemoveInfo command = new RemoveInfo();
+ command.ObjectId = objectId;
+ // Ensure that the object is disposed to avoid potential race-conditions
+ // of trying to re-create the same object in the broker faster than
+ // the broker can dispose of the object.
+ SyncRequest(command);
+ }
+
/// <summary>
/// Creates a new temporary destination name
/// </summary>
@@ -301,67 +334,38 @@
{
connected = true;
// now lets send the connection and see if we get an ack/nak
- SyncRequest(info);
- }
- }
-
- /// <summary>
- /// Register a new consumer
- /// </summary>
- /// <param name="consumerId">A ConsumerId</param>
- /// <param name="consumer">A MessageConsumer</param>
- public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer)
- {
- consumers[consumerId] = consumer;
- }
-
-
- /// <summary>
- /// Remove a consumer
- /// </summary>
- /// <param name="consumerId">A ConsumerId</param>
- public void RemoveConsumer(ConsumerId consumerId)
- {
- consumers[consumerId] = null;
+ if(null == SyncRequest(info))
+ {
+ throw new ConnectionClosedException();
+ }
+ }
}
-
- /// <summary>
+ /// <summary>
/// Handle incoming commands
/// </summary>
- /// <param name="transport">An ITransport</param>
+ /// <param name="commandTransport">An ITransport</param>
/// <param name="command">A Command</param>
- protected void OnCommand(ITransport transport, Command command)
+ protected void OnCommand(ITransport commandTransport, Command command)
{
- if (command is MessageDispatch)
+ if(command is MessageDispatch)
{
- MessageDispatch dispatch = (MessageDispatch) command;
- ConsumerId consumerId = dispatch.ConsumerId;
- MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
- if (consumer == null)
- {
- Tracer.Error("No such consumer active: " + consumerId);
- }
- else
- {
- ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
- consumer.Dispatch(message);
- }
+ DispatchMessage((MessageDispatch) command);
}
- else if (command is WireFormatInfo)
+ else if(command is WireFormatInfo)
{
this.brokerWireFormatInfo = (WireFormatInfo) command;
}
- else if (command is BrokerInfo)
+ else if(command is BrokerInfo)
{
this.brokerInfo = (BrokerInfo) command;
}
- else if (command is ShutdownInfo)
+ else if(command is ShutdownInfo)
{
//ShutdownInfo info = (ShutdownInfo)command;
- if( !closing && !closed )
+ if(!closing && !closed)
{
- OnException(transport, new NMSException("Broker closed this connection."));
+ OnException(commandTransport, new NMSException("Broker closed this connection."));
}
}
else
@@ -370,21 +374,44 @@
}
}
- protected void OnException(ITransport sender, Exception exception)
+ protected void DispatchMessage(MessageDispatch dispatch)
+ {
+ bool dispatched = false;
+
+ foreach(Session session in sessions)
+ {
+ if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+ {
+ dispatched = true;
+ break;
+ }
+ }
+
+ if(!dispatched)
+ {
+ Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+ }
+ }
+
+ protected void OnException(ITransport sender, Exception exception)
{
Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
- if (ExceptionListener != null)
- ExceptionListener(exception);
+ if(ExceptionListener != null)
+ {
+ ExceptionListener(exception);
+ }
}
internal void OnSessionException(Session sender, Exception exception)
{
Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
- if (ExceptionListener != null)
+ if(ExceptionListener != null)
+ {
ExceptionListener(exception);
+ }
}
- protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
+ protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
{
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Tue Feb 12 15:00:40 2008
@@ -82,25 +82,21 @@
public IConnection CreateConnection(string userName, string password)
{
- ConnectionInfo info = CreateConnectionInfo(userName, password);
-
- ITransportFactory tcpTransportFactory = new TcpTransportFactory();
-
Uri uri = brokerUri;
// Do we need to strip off the activemq prefix??
- if ("activemq".Equals(brokerUri.Scheme))
+ if("activemq".Equals(brokerUri.Scheme))
{
- uri = new Uri(brokerUri.AbsolutePath);
+ uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
}
+ ConnectionInfo info = CreateConnectionInfo(userName, password);
+ ITransportFactory tcpTransportFactory = new TcpTransportFactory();
ITransport transport = tcpTransportFactory.CreateTransport(uri);
+ Connection connection = new Connection(uri, transport, info);
- IConnection connection = new Connection(transport, info);
- connection.ClientId = info.ClientId;
-
- // Set properties on connection using parameters prefixed with "jms."
+ // Set properties on connection using parameters prefixed with "connection."
System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(brokerUri.Query);
- URISupport.SetProperties(connection, map, "jms.");
+ URISupport.SetProperties(connection, map, "connection.");
return connection;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs Tue Feb 12 15:00:40 2008
@@ -94,8 +94,11 @@
{
lock (semaphore)
{
- queue.Enqueue(message);
- messageReceivedEventHandle.Set();
+ if(!m_bClosed)
+ {
+ queue.Enqueue(message);
+ messageReceivedEventHandle.Set();
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Tue Feb 12 15:00:40 2008
@@ -39,16 +39,21 @@
private readonly ConsumerInfo info;
private int maximumRedeliveryCount = 10;
private int redeliveryTimeout = 500;
- private readonly Session session;
+ private Session session;
+ private Session ackSession;
protected bool disposed = false;
// Constructor internal to prevent clients from creating an instance.
internal MessageConsumer(Session session, ConsumerInfo info,
- AcknowledgementMode acknowledgementMode)
+ AcknowledgementMode acknowledgementMode)
{
this.session = session;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
+ if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode)
+ {
+ this.ackSession = (Session) session.Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ }
}
~MessageConsumer()
@@ -94,19 +99,19 @@
public IMessage Receive()
{
CheckClosed();
- return SetupAcknowledge(dispatcher.Dequeue());
+ return SetupAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(System.TimeSpan timeout)
{
CheckClosed();
- return SetupAcknowledge(dispatcher.Dequeue(timeout));
+ return SetupAcknowledge(dispatcher.Dequeue(timeout));
}
public IMessage ReceiveNoWait()
{
CheckClosed();
- return SetupAcknowledge(dispatcher.DequeueNoWait());
+ return SetupAcknowledge(dispatcher.DequeueNoWait());
}
public void Dispose()
@@ -129,7 +134,6 @@
try
{
- session.Connection.DisposeOf(info.ConsumerId);
Close();
}
catch
@@ -152,9 +156,17 @@
// wake up any pending dequeue() call on the dispatcher
dispatcher.Close();
+ session.DisposeOf(info.ConsumerId);
+ session = null;
lock(this)
{
+ if(ackSession != null)
+ {
+ ackSession.Close();
+ ackSession = null;
+ }
+
closed = true;
}
}
@@ -174,6 +186,17 @@
/// <param name="message">An ActiveMQMessage</param>
public void Dispatch(ActiveMQMessage message)
{
+ lock(this)
+ {
+ if(ackSession != null)
+ {
+ message.Acknowledger += DoNothingAcknowledge;
+ MessageAck ack = CreateMessageAck(message);
+ Tracer.Debug("Sending AutoAck: " + ack);
+ ackSession.Connection.OneWay(ack);
+ }
+ }
+
dispatcher.Enqueue(message);
}
@@ -190,7 +213,7 @@
break;
}
- message = SetupAcknowledge(message);
+ message = SetupAcknowledge(message);
// invoke listener. Exceptions caught by the dispatcher thread
listener(message);
}
@@ -209,36 +232,34 @@
protected IMessage SetupAcknowledge(IMessage message)
{
- if (message == null)
- return null;
+ if(null == message)
+ {
+ return null;
+ }
+
+ if(message is ActiveMQMessage)
+ {
+ ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+
+ if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode)
+ {
+ activeMessage.Acknowledger += DoClientAcknowledge;
+ }
+ else if(AcknowledgementMode.AutoAcknowledge != acknowledgementMode)
+ {
+ activeMessage.Acknowledger += DoNothingAcknowledge;
+ DoClientAcknowledge(activeMessage);
+ }
+ }
- if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
- {
- if (message is ActiveMQMessage)
- {
- ActiveMQMessage activeMessage = (ActiveMQMessage)message;
- activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
- }
- }
- else
- {
- if (message is ActiveMQMessage)
- {
- ActiveMQMessage activeMessage = (ActiveMQMessage)message;
- activeMessage.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-
- MessageAck ack = CreateMessageAck(activeMessage);
- Tracer.Debug("Sending Ack: " + ack);
- session.Connection.OneWay(ack);
- }
- }
return message;
}
- protected void DoNothingAcknowledge(ActiveMQMessage message)
- {
- }
- protected void DoClientAcknowledge(ActiveMQMessage message)
+ protected void DoNothingAcknowledge(ActiveMQMessage message)
+ {
+ }
+
+ protected void DoClientAcknowledge(ActiveMQMessage message)
{
MessageAck ack = CreateMessageAck(message);
Tracer.Debug("Sending Ack: " + ack);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Tue Feb 12 15:00:40 2008
@@ -15,7 +15,6 @@
* limitations under the License.
*/
using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Util;
using Apache.NMS;
using System;
@@ -26,7 +25,8 @@
/// </summary>
public class MessageProducer : IMessageProducer
{
- private readonly Session session;
+ private Session session;
+ private bool closed = false;
private readonly ProducerInfo info;
private long messageCounter = 0;
@@ -69,7 +69,7 @@
try
{
- session.Connection.DisposeOf(info.ProducerId);
+ Close();
}
catch
{
@@ -79,6 +79,36 @@
disposed = true;
}
+ public void Close()
+ {
+ lock(this)
+ {
+ if(closed)
+ {
+ return;
+ }
+ }
+
+ session.DisposeOf(info.ProducerId);
+ session = null;
+
+ lock(this)
+ {
+ closed = true;
+ }
+ }
+
+ protected void CheckClosed()
+ {
+ lock(this)
+ {
+ if(closed)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
public void Send(IMessage message)
{
Send(info.Destination, message);
@@ -101,7 +131,8 @@
protected void Send(IDestination destination, IMessage message, bool persistent, byte priority, TimeSpan timeToLive, bool specifiedTimeToLive)
{
- ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+ CheckClosed();
+ ActiveMQMessage activeMessage = (ActiveMQMessage) message;
if (!disableMessageID)
{
@@ -171,36 +202,43 @@
public IMessage CreateMessage()
{
+ CheckClosed();
return session.CreateMessage();
}
public ITextMessage CreateTextMessage()
{
+ CheckClosed();
return session.CreateTextMessage();
}
public ITextMessage CreateTextMessage(string text)
{
+ CheckClosed();
return session.CreateTextMessage(text);
}
public IMapMessage CreateMapMessage()
{
+ CheckClosed();
return session.CreateMapMessage();
}
public IObjectMessage CreateObjectMessage(object body)
{
+ CheckClosed();
return session.CreateObjectMessage(body);
}
public IBytesMessage CreateBytesMessage()
{
+ CheckClosed();
return session.CreateBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
+ CheckClosed();
return session.CreateBytesMessage(body);
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -86,13 +86,11 @@
public override void TightMarshal2(OpenWireFormat wireFormat, Object o, BinaryWriter dataOut, BooleanStream bs) {
base.TightMarshal2(wireFormat, o, dataOut, bs);
- ConnectionControl info = (ConnectionControl)o;
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
-
}
//
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -1,4 +1,4 @@
-/**
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -86,7 +86,6 @@
public override void TightMarshal2(OpenWireFormat wireFormat, Object o, BinaryWriter dataOut, BooleanStream bs) {
base.TightMarshal2(wireFormat, o, dataOut, bs);
- ConnectionControl info = (ConnectionControl)o;
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Tue Feb 12 15:00:40 2008
@@ -26,21 +26,23 @@
/// </summary>
public class Session : ISession
{
- private AcknowledgementMode acknowledgementMode;
+ private readonly AcknowledgementMode acknowledgementMode;
private bool asyncSend;
private Connection connection;
private long consumerCounter;
- private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private bool dispatchAsync;
- private DispatchingThread dispatchingThread;
+ private readonly DispatchingThread dispatchingThread;
private bool exclusive;
- private SessionInfo info;
+ private readonly SessionInfo info;
private int maximumPendingMessageLimit;
private int prefetchSize = 1000;
private byte priority;
private long producerCounter;
private bool retroactive;
- private TransactionContext transactionContext;
+ private readonly TransactionContext transactionContext;
+ internal bool startedAsyncDelivery = false;
private bool disposed = false;
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
@@ -50,10 +52,8 @@
this.acknowledgementMode = acknowledgementMode;
this.asyncSend = connection.AsyncSend;
transactionContext = new TransactionContext(this);
- dispatchingThread =
- new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
- dispatchingThread.ExceptionListener +=
- new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+ dispatchingThread = new DispatchingThread(DispatchAsyncMessages);
+ dispatchingThread.ExceptionListener += dispatchingThread_ExceptionListener;
}
~Session()
@@ -166,7 +166,7 @@
try
{
- connection.DisposeOf(info.SessionId);
+ Close();
}
catch
{
@@ -184,10 +184,27 @@
public IMessageProducer CreateProducer(IDestination destination)
{
ProducerInfo command = CreateProducerInfo(destination);
- connection.SyncRequest(command);
- return new MessageProducer(this, command);
- }
+ ProducerId producerId = command.ProducerId;
+ MessageProducer producer = null;
+
+ try
+ {
+ producer = new MessageProducer(this, command);
+ connection.SyncRequest(command);
+ producers[producerId] = producer;
+ }
+ catch(Exception)
+ {
+ if(producer != null)
+ {
+ producer.Close();
+ }
+
+ throw;
+ }
+ return producer;
+ }
public IMessageConsumer CreateConsumer(IDestination destination)
{
@@ -206,22 +223,24 @@
command.AcknowledgementMode = acknowledgementMode;
ConsumerId consumerId = command.ConsumerId;
+ MessageConsumer consumer = null;
try
{
- MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+ consumer = new MessageConsumer(this, command, acknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
- connection.AddConsumer(consumerId, consumer);
-
- connection.SyncRequest(command);
-
consumers[consumerId] = consumer;
+ connection.SyncRequest(command);
return consumer;
}
- catch(Exception e)
+ catch(Exception)
{
- connection.RemoveConsumer(consumerId);
- throw e;
+ if(consumer != null)
+ {
+ consumer.Close();
+ }
+
+ throw;
}
}
@@ -235,23 +254,26 @@
ConsumerId consumerId = command.ConsumerId;
command.SubscriptionName = name;
command.NoLocal = noLocal;
+ MessageConsumer consumer = null;
try
{
- MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+ consumer = new MessageConsumer(this, command, acknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
- connection.AddConsumer(consumerId, consumer);
-
- connection.SyncRequest(command);
-
consumers[consumerId] = consumer;
- return consumer;
+ connection.SyncRequest(command);
}
- catch(Exception e)
+ catch(Exception)
{
- connection.RemoveConsumer(consumerId);
- throw e;
+ if(consumer != null)
+ {
+ consumer.Close();
+ }
+
+ throw;
}
+
+ return consumer;
}
public IQueue GetQueue(string name)
@@ -368,8 +390,20 @@
public void Close()
{
- // To do: what about session id?
+ connection.RemoveSession(this);
StopAsyncDelivery();
+ foreach(MessageConsumer consumer in GetConsumers())
+ {
+ consumer.Close();
+ }
+ consumers.Clear();
+
+ foreach(MessageProducer producer in GetProducers())
+ {
+ producer.Close();
+ }
+ producers.Clear();
+ connection = null;
}
#endregion
@@ -424,9 +458,28 @@
public void DisposeOf(ConsumerId objectId)
{
+ connection.DisposeOf(objectId);
consumers.Remove(objectId);
- connection.RemoveConsumer(objectId);
+ }
+
+ public void DisposeOf(ProducerId objectId)
+ {
connection.DisposeOf(objectId);
+ producers.Remove(objectId);
+ }
+
+ public bool DispatchMessage(ConsumerId consumerId, Message message)
+ {
+ bool dispatched = false;
+ MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+
+ if(consumer != null)
+ {
+ consumer.Dispatch((ActiveMQMessage) message);
+ dispatched = true;
+ }
+
+ return dispatched;
}
/// <summary>
@@ -443,7 +496,6 @@
}
}
-
/// <summary>
/// Returns a copy of the current consumers in a thread safe way to avoid concurrency
/// problems if the consumers are changed in another thread
@@ -456,6 +508,18 @@
}
}
+ /// <summary>
+ /// Returns a copy of the current consumers in a thread safe way to avoid concurrency
+ /// problems if the consumers are changed in another thread
+ /// </summary>
+ protected ICollection GetProducers()
+ {
+ lock(producers.SyncRoot)
+ {
+ return new ArrayList(producers.Values);
+ }
+ }
+
protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
@@ -519,7 +583,11 @@
internal void StopAsyncDelivery()
{
- dispatchingThread.Stop();
+ if(startedAsyncDelivery)
+ {
+ dispatchingThread.Stop();
+ startedAsyncDelivery = false;
+ }
}
internal void StartAsyncDelivery(Dispatcher dispatcher)
@@ -528,7 +596,9 @@
{
dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
}
+
dispatchingThread.Start();
+ startedAsyncDelivery = true;
}
}
-}
\ No newline at end of file
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Tue Feb 12 15:00:40 2008
@@ -33,7 +33,7 @@
{
private TransactionId transactionId;
private Session session;
- private ArrayList synchronizations = new ArrayList();
+ private ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
public TransactionContext(Session session)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Tue Feb 12 15:00:40 2008
@@ -30,6 +30,11 @@
{
private static int maxWait = -1;
+ public int Timeout
+ {
+ get { return maxWait; }
+ set { maxWait = value; }
+ }
private readonly CountDownLatch latch = new CountDownLatch(1);
private Response response;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Tue Feb 12 15:00:40 2008
@@ -30,34 +30,42 @@
/// </summary>
public class ResponseCorrelator : TransportFilter
{
-
- private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
- private readonly Object mutex = new Object();
+ private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+ private readonly Object mutex = new Object();
private short nextCommandId;
+ private int requestTimeout = -1;
- public ResponseCorrelator(ITransport next) : base(next) {
+ public ResponseCorrelator(ITransport next, int requestTimeout) : base(next)
+ {
+ this.requestTimeout = requestTimeout;
}
- short GetNextCommandId() {
- lock(mutex) {
+ short GetNextCommandId()
+ {
+ lock(mutex)
+ {
return ++nextCommandId;
}
}
public override void Oneway(Command command)
{
- command.CommandId = GetNextCommandId();
+ int commandId = GetNextCommandId();
+
+ command.CommandId = commandId;
command.ResponseRequired = false;
next.Oneway(command);
}
public override FutureResponse AsyncRequest(Command command)
{
- command.CommandId = GetNextCommandId();
+ int commandId = GetNextCommandId();
+
+ command.CommandId = commandId;
command.ResponseRequired = true;
FutureResponse future = new FutureResponse();
- requestMap[command.CommandId] = future;
- next.Oneway(command);
+ requestMap[commandId] = future;
+ next.Oneway(command);
return future;
}
@@ -65,15 +73,18 @@
public override Response Request(Command command)
{
FutureResponse future = AsyncRequest(command);
+ future.Timeout = requestTimeout;
Response response = future.Response;
if (response != null && response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
- if (brokerError == null) {
+ if (brokerError == null)
+ {
throw new BrokerException();
}
- else {
+ else
+ {
throw new BrokerException(brokerError);
}
}
@@ -82,37 +93,41 @@
protected override void OnCommand(ITransport sender, Command command)
{
- if( command is Response ) {
-
+ if(command is Response)
+ {
Response response = (Response) command;
- FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
- if (future != null)
- {
- if (response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
- BrokerException exception = new BrokerException(brokerError);
- this.exceptionHandler(this, exception);
- }
- future.Response = response;
- }
- else
+ int correlationId = response.CorrelationId;
+
+ FutureResponse future = (FutureResponse) requestMap[correlationId];
+
+ if(future != null)
{
- if (command is ShutdownInfo)
- {
- // lets shutdown
- this.commandHandler(sender, command);
- }
- else {
- Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
- }
- }
- } else {
+ requestMap.Remove(correlationId);
+ future.Response = response;
+
+ if(response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+ BrokerException exception = new BrokerException(brokerError);
+ this.exceptionHandler(this, exception);
+ }
+ }
+ else
+ {
+ Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
+ }
+ }
+ else if(command is ShutdownInfo)
+ {
+ // lets shutdown
+ this.commandHandler(sender, command);
+ }
+ else
+ {
this.commandHandler(sender, command);
}
}
-
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs Tue Feb 12 15:00:40 2008
@@ -63,20 +63,33 @@
WriteHeader("content-length", contentLength);
}
}
-
+
public void WriteCommand(Command command, String name)
{
+ WriteCommand(command, name, false);
+ }
+
+ public void WriteCommand(Command command, String name, bool ignoreErrors)
+ {
builder.Append(name);
builder.Append(NEWLINE);
- if (command.ResponseRequired)
+ if(command.ResponseRequired)
{
- WriteHeader("receipt", command.CommandId);
+ if(ignoreErrors)
+ {
+ WriteHeader("receipt", "ignore:" + command.CommandId);
+ }
+ else
+ {
+ WriteHeader("receipt", command.CommandId);
+ }
}
}
public void WriteHeader(String name, Object value)
{
- if (value != null) {
+ if (value != null)
+ {
builder.Append(name);
builder.Append(SEPARATOR);
builder.Append(value);
@@ -86,7 +99,8 @@
public void WriteHeader(String name, bool value)
{
- if (value) {
+ if (value)
+ {
builder.Append(name);
builder.Append(SEPARATOR);
builder.Append("true");
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs Tue Feb 12 15:00:40 2008
@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.Reflection;
using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.OpenWire.V1;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS;
using System;
@@ -89,8 +87,8 @@
Response response = new Response();
response.CorrelationId = command.CommandId;
SendCommand(response);
+ Tracer.Debug("#### Autorespond to command: " + o.GetType());
}
- Tracer.Debug("#### Ignored command: " + o.GetType());
}
else
{
@@ -182,15 +180,22 @@
protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
{
- if (command == "RECEIPT" || command == "CONNECTED")
+ if(command == "RECEIPT" || command == "CONNECTED")
{
string text = RemoveHeader(headers, "receipt-id");
- if (text != null)
+ if(text != null)
{
Response answer = new Response();
+ if(text.StartsWith("ignore:"))
+ {
+ text = text.Substring("ignore:".Length);
+ }
+
answer.CorrelationId = Int32.Parse(text);
return answer;
- } else if( command == "CONNECTED") {
+ }
+ else if(command == "CONNECTED")
+ {
text = RemoveHeader(headers, "response-id");
if (text != null)
{
@@ -200,20 +205,31 @@
}
}
}
- else if (command == "ERROR")
+ else if(command == "ERROR")
{
- ExceptionResponse answer = new ExceptionResponse();
string text = RemoveHeader(headers, "receipt-id");
- if (text != null)
+
+ if(text != null && text.StartsWith("ignore:"))
{
- answer.CorrelationId = Int32.Parse(text);
+ Response answer = new Response();
+ answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
+ return answer;
+ }
+ else
+ {
+ ExceptionResponse answer = new ExceptionResponse();
+ if(text != null)
+ {
+ answer.CorrelationId = Int32.Parse(text);
+ }
+
+ BrokerError error = new BrokerError();
+ error.Message = RemoveHeader(headers, "message");
+ error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
+ // TODO is this the right header?
+ answer.Exception = error;
+ return answer;
}
-
- BrokerError error = new BrokerError();
- error.Message = RemoveHeader(headers, "message");
- error.ExceptionClass = RemoveHeader(headers, "exceptionClass"); // TODO is this the right header?
- answer.Exception = error;
- return answer;
}
else if (command == "MESSAGE")
{
@@ -236,11 +252,6 @@
message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
}
- if (message is ActiveMQTextMessage)
- {
- ActiveMQTextMessage textMessage = message as ActiveMQTextMessage;
- }
-
// TODO now lets set the various headers
message.Type = RemoveHeader(headers, "type");
@@ -309,6 +320,7 @@
protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
{
ss.WriteCommand(command, "DISCONNECT");
+ System.Diagnostics.Debug.Assert(!command.ResponseRequired);
ss.Flush();
}
@@ -321,12 +333,7 @@
ss.WriteHeader("selector", command.Selector);
if ( command.NoLocal )
ss.WriteHeader("no-local", command.NoLocal);
-
- if ( AcknowledgementMode.ClientAcknowledge == command.AcknowledgementMode
- || AcknowledgementMode.AutoClientAcknowledge == command.AcknowledgementMode
- || AcknowledgementMode.Transactional == command.AcknowledgementMode
- )
- ss.WriteHeader("ack", "client");
+ ss.WriteHeader("ack", "client");
// ActiveMQ extensions to STOMP
ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
@@ -346,19 +353,19 @@
protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
{
object id = command.ObjectId;
+
if (id is ConsumerId)
{
ConsumerId consumerId = id as ConsumerId;
ss.WriteCommand(command, "UNSUBSCRIBE");
- ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
ss.Flush();
consumers.Remove(consumerId);
}
- // When a session is removed, it needs to remove it's consumers too.
- if (id is SessionId)
+ else if (id is SessionId)
{
-
- // Find all the consumer that were part of the session.
+ // When a session is removed, it needs to remove it's consumers too.
+ // Find all the consumer that were part of the session.
SessionId sessionId = (SessionId) id;
ArrayList matches = new ArrayList();
foreach (DictionaryEntry entry in consumers)
@@ -370,6 +377,8 @@
}
}
+ bool unsubscribedConsumer = false;
+
// Un-subscribe them.
foreach (ConsumerId consumerId in matches)
{
@@ -377,8 +386,34 @@
ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
ss.Flush();
consumers.Remove(consumerId);
+ unsubscribedConsumer = true;
}
+
+ if(!unsubscribedConsumer && command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", sessionId);
+ ss.Flush();
+ }
}
+ else if(id is ProducerId)
+ {
+ if(command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", id);
+ ss.Flush();
+ }
+ }
+ else if(id is ConnectionId)
+ {
+ if(command.ResponseRequired)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE", true);
+ ss.WriteHeader("id", id);
+ ss.Flush();
+ }
+ }
}
@@ -458,12 +493,14 @@
protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
{
- ss.WriteCommand(command, "ACK");
+ ss.WriteCommand(command, "ACK", true);
// TODO handle bulk ACKs?
- ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
- if( command.TransactionId!=null )
+ ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+ if(command.TransactionId != null)
+ {
ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+ }
ss.Flush();
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Feb 12 15:00:40 2008
@@ -30,19 +30,21 @@
/// </summary>
public class TcpTransport : ITransport
{
- private readonly object initLock = "initLock";
- private readonly Socket socket;
- private IWireFormat wireformat;
+ private readonly object initLock = new object();
+ private readonly Socket socket;
+ private IWireFormat wireformat;
private BinaryReader socketReader;
- private readonly object socketReaderLock = "socketReaderLock";
+ private readonly object socketReaderLock = new object();
private BinaryWriter socketWriter;
- private readonly object socketWriterLock = "socketWriterLock";
+ private readonly object socketWriterLock = new object();
private Thread readThread;
private bool started;
private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
+ private const int MAX_THREAD_WAIT = 30000;
+
public TcpTransport(Socket socket, IWireFormat wireformat)
{
@@ -72,7 +74,7 @@
}
started = true;
-
+
// As reported in AMQ-988 it appears that NetworkStream is not thread safe
// so lets use an instance for each of the 2 streams
socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
@@ -125,11 +127,11 @@
public void Close()
{
- if (closed.CompareAndSet(false, true))
+ if(closed.CompareAndSet(false, true))
{
- lock (initLock)
+ lock(initLock)
{
- try
+ try
{
socket.Shutdown(SocketShutdown.Both);
}
@@ -137,7 +139,7 @@
{
}
- lock (socketWriterLock)
+ lock(socketWriterLock)
{
if(null != socketWriter)
{
@@ -146,7 +148,7 @@
}
}
- lock (socketReaderLock)
+ lock(socketReaderLock)
{
if(null != socketReader)
{
@@ -157,20 +159,25 @@
socket.Close();
- if(null != readThread
- && Thread.CurrentThread != readThread
+ if(null != readThread)
+ {
+ if(Thread.CurrentThread != readThread
#if !NETCF
- && readThread.IsAlive
+ && readThread.IsAlive
#endif
- )
- {
- readThread.Abort();
- readThread.Join();
+ )
+ {
+ if(!readThread.Join(MAX_THREAD_WAIT))
+ {
+ readThread.Abort();
+ }
+ }
+
readThread = null;
}
- }
- started = false;
+ started = false;
+ }
}
}
@@ -195,32 +202,35 @@
// An exception in the command handler may not be fatal to the transport, so
// these are simply reported to the exceptionHandler.
//
- while (!closed.Value)
+ while(!closed.Value)
{
Command command = null;
- try
+
+ try
{
command = (Command) Wireformat.Unmarshal(socketReader);
}
catch(Exception ex)
{
- if (!closed.Value)
- {
+ command = null;
+ if(!closed.Value)
+ {
// Close the socket as there's little that can be done with this transport now.
Close();
this.exceptionHandler(this, ex);
- break;
- }
+ }
+
+ break;
}
try
{
- if (command != null)
+ if(command != null)
{
this.commandHandler(this, command);
}
}
- catch (Exception e)
+ catch(Exception e)
{
this.exceptionHandler(this, e);
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Feb 12 15:00:40 2008
@@ -28,18 +28,35 @@
{
public class TcpTransportFactory : ITransportFactory
{
- private bool useLogging = false;
-
public TcpTransportFactory()
{
}
+ #region Properties
+
+ private bool useLogging = false;
public bool UseLogging
{
get { return useLogging; }
set { useLogging = value; }
}
+ private string wireFormat = "OpenWire";
+ public string WireFormat
+ {
+ get { return wireFormat; }
+ set { wireFormat = value; }
+ }
+
+ private int requestTimeout = -1;
+ public int RequestTimeout
+ {
+ get { return requestTimeout; }
+ set { requestTimeout = value; }
+ }
+
+ #endregion
+
#region ITransportFactory Members
public ITransport CreateTransport(Uri location)
@@ -68,7 +85,7 @@
}
transport = new MutexTransport(transport);
- transport = new ResponseCorrelator(transport);
+ transport = new ResponseCorrelator(transport, requestTimeout);
return transport;
}
@@ -79,7 +96,16 @@
{
// Looping through the AddressList allows different type of connections to be tried
// (IPv4, IPv6 and whatever else may be available).
+#if MONO
+ // The following GetHostByName() API has been obsoleted in .NET 2.0. It has been
+ // superceded by GetHostEntry(). At some point, it will probably be removed
+ // from the Mono class library, and this #if statement can be removed.
+
+ IPHostEntry hostEntry = Dns.GetHostByName(host);
+#else
IPHostEntry hostEntry = Dns.GetHostEntry(host);
+#endif
+
foreach(IPAddress address in hostEntry.AddressList)
{
Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
@@ -95,19 +121,24 @@
protected IWireFormat CreateWireFormat(Uri location, StringDictionary map)
{
object properties = null;
- IWireFormat wireFormat = null;
+ IWireFormat wireFormatItf = null;
// Detect STOMP etc
if(String.Compare(location.Scheme, "stomp", true) == 0)
{
- wireFormat = new StompWireFormat();
- properties = wireFormat;
+ this.wireFormat = "STOMP";
+ }
+
+ if(String.Compare(this.wireFormat, "stomp", true) == 0)
+ {
+ wireFormatItf = new StompWireFormat();
+ properties = wireFormatItf;
}
else
{
OpenWireFormat openwireFormat = new OpenWireFormat();
- wireFormat = openwireFormat;
+ wireFormatItf = openwireFormat;
properties = openwireFormat.PreferedWireFormatInfo;
}
@@ -117,7 +148,7 @@
URISupport.SetProperties(properties, map, "wireFormat.");
}
- return wireFormat;
+ return wireFormatItf;
}
}
}
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs?rev=627169&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs Tue Feb 12 15:00:40 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ.Util
+{
+ class NMSConvert
+ {
+ public static AcknowledgementMode ToAcknowledgementMode(string ackText)
+ {
+ if(String.Compare(ackText, "AutoAcknowledge", true) == 0)
+ {
+ return AcknowledgementMode.AutoAcknowledge;
+ }
+ else if(String.Compare(ackText, "ClientAcknowledge", true) == 0)
+ {
+ return AcknowledgementMode.ClientAcknowledge;
+ }
+ else if(String.Compare(ackText, "DupsOkAcknowledge", true) == 0)
+ {
+ return AcknowledgementMode.DupsOkAcknowledge;
+ }
+ else if(String.Compare(ackText, "Transactional", true) == 0)
+ {
+ return AcknowledgementMode.Transactional;
+ }
+ else
+ {
+ return AcknowledgementMode.AutoAcknowledge;
+ }
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs Tue Feb 12 15:00:40 2008
@@ -14,21 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
using NUnit.Framework;
-using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
- public class BadConsumeTest : NMS.Test.BadConsumeTest
+ public class BadConsumeTest_OpenWire : Apache.NMS.Test.BadConsumeTest
{
protected override IConnectionFactory CreateConnectionFactory()
{
- return new ConnectionFactory();
+ return TestUtils.CreateOpenWireConnectionFactory();
}
}
+
+ [TestFixture]
+ public class BadConsumeTest_Stomp : Apache.NMS.Test.BadConsumeTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
using NUnit.Framework;
-using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
- public class BytesMessageTest : NMS.Test.BytesMessageTest
+ public class BytesMessageTest_OpenWire : Apache.NMS.Test.BytesMessageTest
{
protected override IConnectionFactory CreateConnectionFactory()
{
- return new ConnectionFactory();
+ return TestUtils.CreateOpenWireConnectionFactory();
}
}
-}
-
-
+ [TestFixture]
+ public class BytesMessageTest_Stomp : Apache.NMS.Test.BytesMessageTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs Tue Feb 12 15:00:40 2008
@@ -19,7 +19,7 @@
using System.Collections;
-namespace Apache.NMS.ActiveMQ.Commands
+namespace Apache.NMS.ActiveMQ.Test.Commands
{
[TestFixture]
public class CommandTest
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs Tue Feb 12 15:00:40 2008
@@ -20,7 +20,7 @@
[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
[assembly: AssemblyProductAttribute("Apache NMS for ActiveMQ Class Library")]
-[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2007 Apache Software Foundation")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2008 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
[assembly: AssemblyVersionAttribute("1.0")]
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
using NUnit.Framework;
-using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
- public class ConsumerTest : NMS.Test.ConsumerTest
+ public class ConsumerTest_OpenWire : Apache.NMS.Test.ConsumerTest
{
protected override IConnectionFactory CreateConnectionFactory()
{
- return new ConnectionFactory();
+ return TestUtils.CreateOpenWireConnectionFactory();
}
}
-}
-
-
+ [TestFixture]
+ public class ConsumerTest_Stomp : Apache.NMS.Test.ConsumerTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs Tue Feb 12 15:00:40 2008
@@ -14,18 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
-using Apache.NMS;
using NUnit.Framework;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
- public class DurableTest : NMS.Test.DurableTest
+ public class DurableTest_OpenWire : Apache.NMS.Test.DurableTest
{
protected override IConnectionFactory CreateConnectionFactory()
{
- return new ConnectionFactory();
+ return TestUtils.CreateOpenWireConnectionFactory();
}
}
+
+ [TestFixture]
+ public class DurableTest_Stomp : Apache.NMS.Test.DurableTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
using NUnit.Framework;
-using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
- [ TestFixture ]
- public class MapMessageTest : NMS.Test.MapMessageTest
- {
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return new ConnectionFactory();
- }
-
- }
-}
+ [TestFixture]
+ public class MapMessageTest_OpenWire : Apache.NMS.Test.MapMessageTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateOpenWireConnectionFactory();
+ }
+ }
+
+ [TestFixture]
+ public class MapMessageTest_Stomp : Apache.NMS.Test.MapMessageTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ public override void SendAndSyncReceive()
+ {
+ // TODO disable test
+ }
+ protected override void AssertValidMessage(IMessage message)
+ {
+ System.Console.WriteLine("Received MapMessage: " + message);
+
+ Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!");
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Tue Feb 12 15:00:40 2008
@@ -1,58 +1,52 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using Apache.NMS;
-using NUnit.Framework;
-using System.Reflection;
-using System.IO;
-
-namespace Apache.NMS.ActiveMQ
-{
- [TestFixture]
- public class NMSConnectionFactoryTest
- {
- [Test]
- public void TestTcpURI()
- {
- NMSConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
- Assert.IsNotNull(factory);
- Assert.IsNotNull(factory.ConnectionFactory);
- Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
- }
- [Test]
- public void TestStompURI()
- {
- NMSConnectionFactory factory = new NMSConnectionFactory("stomp://localhost:61616");
- Assert.IsNotNull(factory);
- Assert.IsNotNull(factory.ConnectionFactory);
- Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
- }
-
- [Test]
- public void TestActiveMQURI()
- {
- NMSConnectionFactory factory = new NMSConnectionFactory("activemq:tcp://localhost:61616");
- Assert.IsNotNull(factory);
- Assert.IsNotNull(factory.ConnectionFactory);
- Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
- }
-
- }
-
-
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ [TestFixture]
+ public class NMSConnectionFactoryTest
+ {
+ [Test]
+ public void TestTcpURI()
+ {
+ NMSConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+ }
+
+ [Test]
+ public void TestStompURI()
+ {
+ NMSConnectionFactory factory = new NMSConnectionFactory("stomp://localhost:61613");
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+ }
+
+ [Test]
+ public void TestActiveMQURI()
+ {
+ NMSConnectionFactory factory = new NMSConnectionFactory("activemq:tcp://localhost:61616");
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs Tue Feb 12 15:00:40 2008
@@ -14,23 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-//using Apache.NMS.ActiveMQ;
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
+
using NUnit.Framework;
-using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
{
- [ TestFixture ]
- public class NMSPropertyTest : NMS.Test.NMSPropertyTest
+ [TestFixture]
+ public class NMSPropertyTest_OpenWire : Apache.NMS.Test.NMSPropertyTest
{
protected override IConnectionFactory CreateConnectionFactory()
{
- return new ConnectionFactory();
- }
+ return TestUtils.CreateOpenWireConnectionFactory();
+ }
}
-}
+ [TestFixture]
+ public class NMSPropertyTest_Stomp : Apache.NMS.Test.NMSPropertyTest
+ {
+ protected override IConnectionFactory CreateConnectionFactory()
+ {
+ return TestUtils.CreateStompConnectionFactory();
+ }
+ protected override void AssertNonStringProperties(IMessage message)
+ {
+ // lets disable typesafe property testing as right now Stomp does not support them
+ }
+ protected override void AssertReplyToValid(IMessage message)
+ {
+ Assert.IsNotNull(message.NMSReplyTo, "NMSReplyTo");
+ Assert.IsTrue(message.NMSReplyTo is ITemporaryQueue, "The reply to destination is not a TemporaryTopic!: " + message.NMSReplyTo);
+ }
+ }
+}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs Tue Feb 12 15:00:40 2008
@@ -17,8 +17,10 @@
using NUnit.Framework;
using System;
using System.IO;
+using Apache.NMS.ActiveMQ.OpenWire;
-namespace Apache.NMS.ActiveMQ.OpenWire {
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
+{
[ TestFixture ]
public class BooleanStreamTest
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs Tue Feb 12 15:00:40 2008
@@ -19,7 +19,7 @@
using System;
using System.IO;
-namespace Apache.NMS.ActiveMQ.OpenWire
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
{
[TestFixture]
public class EndianTest
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs Tue Feb 12 15:00:40 2008
@@ -20,7 +20,7 @@
using System.IO;
using Apache.NMS.ActiveMQ.OpenWire;
-namespace Apache.NMS.ActiveMQ.OpenWire
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
{
[TestFixture]
public class PrimitiveMapTest
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs Tue Feb 12 15:00:40 2008
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
- [TestFixture]
- public class BadConsumeTest : NMS.Test.BadConsumeTest
- {
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return StompTestUtils.CreateStompConnectionFactory();
- }
- }
-}
-
-
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs Tue Feb 12 15:00:40 2008
@@ -1,27 +0,0 @@
-using System;
-using System.Reflection;
-using System.Runtime.InteropServices;
-
-//------------------------------------------------------------------------------
-// <auto-generated>
-// This code was generated by a tool.
-// Runtime Version:2.0.50727.832
-//
-// Changes to this file may cause incorrect behavior and will be lost if
-// the code is regenerated.
-// </auto-generated>
-//------------------------------------------------------------------------------
-
-[assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
-[assembly: AssemblyTitleAttribute("Stomp .NET Tests")]
-[assembly: AssemblyDescriptionAttribute("Unit Tests for the NMS (.Net Messaging Library) using the STOMP protocol")]
-[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
-[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/")]
-[assembly: AssemblyProductAttribute("Apache ActiveMQ")]
-[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2007 Apache Software Foundation")]
-[assembly: AssemblyTrademarkAttribute("")]
-[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0")]
-[assembly: AssemblyInformationalVersionAttribute("4.0")]
-
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs Tue Feb 12 15:00:40 2008
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
- [TestFixture]
- public class ConsumerTest : NMS.Test.ConsumerTest
- {
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return StompTestUtils.CreateStompConnectionFactory();
- }
- }
-}
-
-
-
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs Tue Feb 12 15:00:40 2008
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using Apache.NMS;
-using NUnit.Framework;
-
-namespace Apache.Stomp
-{
- [TestFixture]
- public class DurableTest : NMS.Test.DurableTest
- {
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return StompTestUtils.CreateStompConnectionFactory();
- }
- }
-}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.Stomp;
-using Apache.NMS;
-using NUnit.Framework;
-using System;
-
-namespace Apache.Stomp
-{
- [ TestFixture ]
- public class MapMessageTest : NMS.Test.MapMessageTest
- {
- public override void SendAndSyncReceive()
- {
- // TODO disable test
- }
-
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return StompTestUtils.CreateStompConnectionFactory();
- }
-
- protected override void AssertValidMessage(IMessage message)
- {
- Console.WriteLine("Received MapMessage: " + message);
-
- Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!");
- }
- }
-}
-
-
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs Tue Feb 12 15:00:40 2008
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
- [ TestFixture ]
- public class NMSPropertyTest : NMS.Test.NMSPropertyTest
- {
- protected override IConnectionFactory CreateConnectionFactory()
- {
- return StompTestUtils.CreateStompConnectionFactory();
- }
-
- protected override void AssertNonStringProperties(IMessage message)
- {
- // lets disable typesafe property testing as right now Stomp does not support them
- }
-
-
- protected override void AssertReplyToValid(IMessage message)
- {
- // TODO completely support temporary destinations in STOMP
-
- Assert.IsNotNull(message.NMSReplyTo, "NMSReplyTo");
- Assert.IsTrue(message.NMSReplyTo is ITemporaryQueue, "The reply to destination is not a TemporaryTopic!: " + message.NMSReplyTo);
- }
- }
-}
-
-
-