You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/09/22 20:11:31 UTC
svn commit: r449031 - in /incubator/activemq/activemq-dotnet/trunk/src:
main/csharp/ main/csharp/ActiveMQ/ main/csharp/ActiveMQ/Transport/
main/csharp/ActiveMQ/Transport/Tcp/ test/csharp/ test/csharp/NMS/
Author: jstrachan
Date: Fri Sep 22 11:11:30 2006
New Revision: 449031
URL: http://svn.apache.org/viewvc?view=rev&rev=449031
Log:
Tidied up the wire format negotiation code for C# together with disabling logging when testing and some minor cleanups of the code
Added:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
Modified:
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs
incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs
incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Fri Sep 22 11:11:30 2006
@@ -160,8 +160,8 @@
if (response is ExceptionResponse)
{
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
- // TODO include stack trace
- throw new NMSException("Request failed: " + exceptionResponse);
+ BrokerError brokerError = exceptionResponse.Exception;
+ throw new BrokerException(brokerError);
}
return response;
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs Fri Sep 22 11:11:30 2006
@@ -21,50 +21,50 @@
namespace ActiveMQ.Transport
{
- /// <summary>
- /// A Transport which gaurds access to the next transport using a mutex.
- /// </summary>
- public class MutexTransport : TransportFilter
+ /// <summary>
+ /// A Transport which gaurds access to the next transport using a mutex.
+ /// </summary>
+ public class MutexTransport : TransportFilter
{
-
- private readonly object transmissionLock = new object();
-
- public MutexTransport(ITransport next) : base(next) {
- }
-
-
- public override void Oneway(Command command)
- {
+
+ private readonly object transmissionLock = new object();
+
+ public MutexTransport(ITransport next) : base(next) {
+ }
+
+
+ public override void Oneway(Command command)
+ {
lock (transmissionLock)
{
- this.next.Oneway(command);
- }
- }
-
- public override FutureResponse AsyncRequest(Command command)
- {
+ this.next.Oneway(command);
+ }
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
lock (transmissionLock)
{
- return base.AsyncRequest(command);
- }
- }
-
- public override Response Request(Command command)
- {
+ return base.AsyncRequest(command);
+ }
+ }
+
+ public override Response Request(Command command)
+ {
lock (transmissionLock)
{
- return base.Request(command);
- }
- }
-
- public override void Dispose()
- {
+ return base.Request(command);
+ }
+ }
+
+ public override void Dispose()
+ {
lock (transmissionLock)
{
- base.Dispose();
- }
- }
-
+ base.Dispose();
+ }
+ }
+
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs Fri Sep 22 11:11:30 2006
@@ -24,46 +24,46 @@
namespace ActiveMQ.Transport
{
- /// <summary>
- /// A Transport which gaurds access to the next transport using a mutex.
- /// </summary>
- public class ResponseCorrelator : TransportFilter
+ /// <summary>
+ /// A Transport which gaurds access to the next transport using a mutex.
+ /// </summary>
+ public class ResponseCorrelator : TransportFilter
{
private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
private readonly Object mutex = new Object();
private short nextCommandId;
-
- public ResponseCorrelator(ITransport next) : base(next) {
- }
-
- short GetNextCommandId() {
- lock(mutex) {
- return ++nextCommandId;
- }
- }
-
- public override void Oneway(Command command)
- {
- command.CommandId = GetNextCommandId();
- command.ResponseRequired = false;
- next.Oneway(command);
- }
-
- public override FutureResponse AsyncRequest(Command command)
- {
- command.CommandId = GetNextCommandId();
- command.ResponseRequired = true;
- FutureResponse future = new FutureResponse();
- requestMap[command.CommandId] = future;
- next.Oneway(command);
- return future;
-
- }
-
- public override Response Request(Command command)
- {
- FutureResponse future = AsyncRequest(command);
+
+ public ResponseCorrelator(ITransport next) : base(next) {
+ }
+
+ short GetNextCommandId() {
+ lock(mutex) {
+ return ++nextCommandId;
+ }
+ }
+
+ public override void Oneway(Command command)
+ {
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = false;
+ next.Oneway(command);
+ }
+
+ public override FutureResponse AsyncRequest(Command command)
+ {
+ command.CommandId = GetNextCommandId();
+ command.ResponseRequired = true;
+ FutureResponse future = new FutureResponse();
+ requestMap[command.CommandId] = future;
+ next.Oneway(command);
+ return future;
+
+ }
+
+ public override Response Request(Command command)
+ {
+ FutureResponse future = AsyncRequest(command);
Response response = future.Response;
if (response is ExceptionResponse)
{
@@ -72,33 +72,41 @@
throw new BrokerException(brokerError);
}
return response;
- }
-
- protected override void OnCommand(ITransport sender, Command command)
- {
- if( command is Response ) {
-
- Response response = (Response) command;
- FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
- if (future != null)
- {
- if (response is ExceptionResponse)
- {
- ExceptionResponse er = (ExceptionResponse) response;
- BrokerError brokerError = er.Exception;
- this.exceptionHandler(this, new BrokerException(brokerError));
- }
- future.Response = response;
- }
- else
- {
- Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
- }
- } else {
- this.commandHandler(sender, command);
- }
- }
-
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ if( command is Response ) {
+
+ Response response = (Response) command;
+ FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
+ if (future != null)
+ {
+ if (response is ExceptionResponse)
+ {
+ ExceptionResponse er = (ExceptionResponse) response;
+ BrokerError brokerError = er.Exception;
+ BrokerException exception = new BrokerException(brokerError);
+ this.exceptionHandler(this, exception);
+ }
+ future.Response = response;
+ }
+ else
+ {
+ if (command is ShutdownInfo)
+ {
+ // lets shutdown
+ this.commandHandler(sender, command);
+ }
+ else {
+ Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
+ }
+ }
+ } else {
+ this.commandHandler(sender, command);
+ }
+ }
+
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Fri Sep 22 11:11:30 2006
@@ -28,10 +28,10 @@
namespace ActiveMQ.Transport.Tcp
{
- /// <summary>
- /// An implementation of ITransport that uses sockets to communicate with the broker
- /// </summary>
- public class TcpTransport : ITransport
+ /// <summary>
+ /// An implementation of ITransport that uses sockets to communicate with the broker
+ /// </summary>
+ public class TcpTransport : ITransport
{
private Socket socket;
private OpenWireFormat wireformat = new OpenWireFormat();
@@ -46,7 +46,7 @@
public TcpTransport(Socket socket)
{
- this.socket = socket;
+ this.socket = socket;
}
/// <summary>
@@ -56,11 +56,11 @@
{
if (!started)
{
- if( commandHandler == null )
- throw new InvalidOperationException ("command cannot be null when Start is called.");
- if( exceptionHandler == null )
- throw new InvalidOperationException ("exception cannot be null when Start is called.");
-
+ if( commandHandler == null )
+ throw new InvalidOperationException ("command cannot be null when Start is called.");
+ if( exceptionHandler == null )
+ throw new InvalidOperationException ("exception cannot be null when Start is called.");
+
started = true;
NetworkStream networkStream = new NetworkStream(socket);
@@ -70,23 +70,13 @@
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start();
-
- // lets send the wireformat we're using
- WireFormatInfo info = new WireFormatInfo();
- info.StackTraceEnabled=false;
- info.TightEncodingEnabled=false;
- info.TcpNoDelayEnabled=false;
- info.CacheEnabled=false;
- info.SizePrefixDisabled=false;
-
- Oneway(info);
}
}
- public void Oneway(Command command)
+ public void Oneway(Command command)
{
- wireformat.Marshal(command, socketWriter);
- socketWriter.Flush();
+ wireformat.Marshal(command, socketWriter);
+ socketWriter.Flush();
}
public FutureResponse AsyncRequest(Command command)
@@ -101,9 +91,9 @@
public void Dispose()
{
- closed = true;
- socket.Close();
- readThread.Join();
+ closed = true;
+ socket.Close();
+ readThread.Join();
socketWriter.Close();
socketReader.Close();
}
@@ -115,36 +105,36 @@
try
{
Command command = (Command) wireformat.Unmarshal(socketReader);
- this.commandHandler(this, command);
+ this.commandHandler(this, command);
}
- catch (ObjectDisposedException)
+ catch (ObjectDisposedException)
{
break;
}
- catch ( Exception e) {
- if( e.GetBaseException() is ObjectDisposedException ) {
- break;
- }
- if( !closed ) {
- this.exceptionHandler(this,e);
- }
- break;
- }
+ catch ( Exception e) {
+ if( e.GetBaseException() is ObjectDisposedException ) {
+ break;
+ }
+ if( !closed ) {
+ this.exceptionHandler(this,e);
+ }
+ break;
+ }
}
}
// Implementation methods
- public CommandHandler Command {
+ public CommandHandler Command {
get { return commandHandler; }
set { this.commandHandler = value; }
}
-
+
public ExceptionHandler Exception {
get { return exceptionHandler; }
set { this.exceptionHandler = value; }
}
-
+
}
}
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Fri Sep 22 11:11:30 2006
@@ -18,6 +18,8 @@
using System;
using System.Net;
using System.Net.Sockets;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
using ActiveMQ.Transport;
namespace ActiveMQ.Transport.Tcp {
@@ -47,6 +49,8 @@
}
rc = new ResponseCorrelator(rc);
rc = new MutexTransport(rc);
+ rc = new WireFormatNegotiator(rc);
+
return rc;
}
Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?view=auto&rev=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Fri Sep 22 11:11:30 2006
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using ActiveMQ.Commands;
+using ActiveMQ.Transport;
+using System;
+
+namespace ActiveMQ.Transport
+{
+
+ /// <summary>
+ /// A Transport which negotiates the wire format
+ /// </summary>
+ public class WireFormatNegotiator : TransportFilter
+ {
+ public const int OPENWIRE_VERSION = 2;
+
+
+ public WireFormatNegotiator(ITransport next) : base(next) {
+ }
+
+ public override void Start() {
+ base.Start();
+
+
+ // now lets start the protocol negotiation
+ WireFormatInfo info = new WireFormatInfo();
+ info.StackTraceEnabled=false;
+ info.TightEncodingEnabled=false;
+ info.TcpNoDelayEnabled=false;
+ info.CacheEnabled=false;
+ info.SizePrefixDisabled=false;
+ info.Version = OPENWIRE_VERSION;
+
+ Oneway(info);
+ }
+ }
+}
+
Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs Fri Sep 22 11:11:30 2006
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
using System;
using System.Reflection;
using System.Runtime.InteropServices;
@@ -38,6 +22,6 @@
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0.2406.0")]
+[assembly: AssemblyVersionAttribute("4.0.2456.0")]
[assembly: AssemblyInformationalVersionAttribute("4.0")]
Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs Fri Sep 22 11:11:30 2006
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
using System;
using System.Reflection;
using System.Runtime.InteropServices;
@@ -38,6 +22,6 @@
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0.2406.0")]
+[assembly: AssemblyVersionAttribute("4.0.2456.0")]
[assembly: AssemblyInformationalVersionAttribute("4.0")]
Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs Fri Sep 22 11:11:30 2006
@@ -24,32 +24,35 @@
namespace NMS
{
- [ TestFixture ]
+ [ TestFixture ]
public abstract class JMSTestSupport
{
-
- private IConnectionFactory factory;
+
+ // enable/disable logging of message flows
+ protected bool logging = false;
+
+ private IConnectionFactory factory;
private IConnection connection;
- private ISession session;
- private IDestination destination;
-
- protected int receiveTimeout = 1000;
- protected string clientId;
- protected DestinationType destinationType = DestinationType.Queue;
- protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
-
- [SetUp]
+ private ISession session;
+ private IDestination destination;
+
+ protected int receiveTimeout = 1000;
+ protected string clientId;
+ protected DestinationType destinationType = DestinationType.Queue;
+ protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
+
+ [SetUp]
virtual public void SetUp()
{
}
-
+
[TearDown]
virtual public void TearDown()
{
- Disconnect();
+ Disconnect();
}
-
- // Properties
+
+ // Properties
public bool Connected
{
get { return connection!=null; }
@@ -59,57 +62,57 @@
public IConnectionFactory Factory
{
get {
- if( factory == null ) {
- factory = CreateConnectionFactory();
- Assert.IsNotNull(factory, "no factory created");
- }
- return factory;
- }
+ if( factory == null ) {
+ factory = CreateConnectionFactory();
+ Assert.IsNotNull(factory, "no factory created");
+ }
+ return factory;
+ }
set { this.factory = value; }
}
-
+
public IConnection Connection
{
get {
- if( connection == null ) {
- Connect();
- }
- return connection;
- }
+ if( connection == null ) {
+ Connect();
+ }
+ return connection;
+ }
set { this.connection = value; }
}
-
+
public ISession Session
{
get {
- if( session == null ) {
- session = Connection.CreateSession(acknowledgementMode);
- Assert.IsNotNull(connection != null, "no session created");
- }
- return session;
- }
+ if( session == null ) {
+ session = Connection.CreateSession(acknowledgementMode);
+ Assert.IsNotNull(connection != null, "no session created");
+ }
+ return session;
+ }
set { this.session = value; }
}
-
- virtual protected void Connect()
+
+ virtual protected void Connect()
{
- Console.WriteLine("Connectting...");
- connection = CreateConnection();
- Assert.IsNotNull(connection, "no connection created");
- connection.Start();
- Console.WriteLine("Connected.");
- Assert.IsNotNull(connection, "no connection created");
+ Console.WriteLine("Connectting...");
+ connection = CreateConnection();
+ Assert.IsNotNull(connection, "no connection created");
+ connection.Start();
+ Console.WriteLine("Connected.");
+ Assert.IsNotNull(connection, "no connection created");
}
-
+
virtual protected void Disconnect()
{
if (connection != null)
{
- Console.WriteLine("Disconnecting...");
+ Console.WriteLine("Disconnecting...");
connection.Dispose();
connection = null;
- session=null;
- Console.WriteLine("Disconnected.");
+ session=null;
+ Console.WriteLine("Disconnected.");
}
}
@@ -118,82 +121,82 @@
Disconnect();
Connect();
}
-
- protected virtual void Drain()
- {
+
+ protected virtual void Drain()
+ {
using (ISession session = Connection.CreateSession())
{
- // Tries to consume any messages on the Destination
- IMessageConsumer consumer = session.CreateConsumer(Destination);
-
- // Should only need to wait for first message to arrive due to the way
- // prefetching works.
- IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
- while (msg != null)
- {
- msg = consumer.ReceiveNoWait();
- }
- }
- }
-
+ // Tries to consume any messages on the Destination
+ IMessageConsumer consumer = session.CreateConsumer(Destination);
+
+ // Should only need to wait for first message to arrive due to the way
+ // prefetching works.
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
+ while (msg != null)
+ {
+ msg = consumer.ReceiveNoWait();
+ }
+ }
+ }
+
public virtual void SendAndSyncReceive()
{
using (ISession session = Connection.CreateSession())
{
-
- IMessageConsumer consumer = session.CreateConsumer(Destination);
- IMessageProducer producer = session.CreateProducer(Destination);
-
- IMessage request = CreateMessage();
- producer.Send(request);
-
- IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
+
+ IMessageConsumer consumer = session.CreateConsumer(Destination);
+ IMessageProducer producer = session.CreateProducer(Destination);
+
+ IMessage request = CreateMessage();
+ producer.Send(request);
+
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
Assert.IsNotNull(message, "No message returned!");
AssertValidMessage(message);
}
}
-
- protected virtual IConnectionFactory CreateConnectionFactory()
- {
- return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616?logging=true"));
- }
-
- protected virtual IConnection CreateConnection()
- {
- IConnection connection = Factory.CreateConnection();
- if( clientId!=null ) {
- connection.ClientId = clientId;
- }
- return connection;
- }
-
- protected virtual IMessageProducer CreateProducer()
- {
- IMessageProducer producer = Session.CreateProducer(Destination);
- return producer;
- }
-
- protected virtual IMessageConsumer CreateConsumer()
- {
- IMessageConsumer consumer = Session.CreateConsumer(Destination);
- return consumer;
- }
+
+ protected virtual IConnectionFactory CreateConnectionFactory()
+ {
+ return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616?logging=" + logging));
+ }
+
+ protected virtual IConnection CreateConnection()
+ {
+ IConnection connection = Factory.CreateConnection();
+ if( clientId!=null ) {
+ connection.ClientId = clientId;
+ }
+ return connection;
+ }
+
+ protected virtual IMessageProducer CreateProducer()
+ {
+ IMessageProducer producer = Session.CreateProducer(Destination);
+ return producer;
+ }
+
+ protected virtual IMessageConsumer CreateConsumer()
+ {
+ IMessageConsumer consumer = Session.CreateConsumer(Destination);
+ return consumer;
+ }
protected virtual IDestination CreateDestination()
{
- if( destinationType == DestinationType.Queue ) {
- return Session.GetQueue(CreateDestinationName());
- } else if( destinationType == DestinationType.Topic ) {
- return Session.GetTopic(CreateDestinationName());
- } else if( destinationType == DestinationType.TemporaryQueue ) {
- return Session.CreateTemporaryQueue();
- } else if( destinationType == DestinationType.TemporaryTopic ) {
- return Session.CreateTemporaryTopic();
- } else {
- throw new Exception("Unknown destination type: "+destinationType);
- }
+ if( destinationType == DestinationType.Queue ) {
+ return Session.GetQueue(CreateDestinationName());
+ } else if( destinationType == DestinationType.Topic ) {
+ return Session.GetTopic(CreateDestinationName());
+ } else if( destinationType == DestinationType.TemporaryQueue ) {
+ return Session.CreateTemporaryQueue();
+ } else if( destinationType == DestinationType.TemporaryTopic ) {
+ return Session.CreateTemporaryTopic();
+ } else {
+ throw new Exception("Unknown destination type: "+destinationType);
+ }
}
-
+
protected virtual string CreateDestinationName()
{
return "Test.DotNet." + GetType().Name;
@@ -208,24 +211,24 @@
{
Assert.IsNotNull(message, "Null Message!");
}
-
-
+
+
public IDestination Destination
{
get {
- if (destination == null)
- {
- destination = CreateDestination();
- Assert.IsNotNull(destination, "No destination available!");
- Console.WriteLine("Using destination: " + destination);
- }
- return destination;
- }
+ if (destination == null)
+ {
+ destination = CreateDestination();
+ Assert.IsNotNull(destination, "No destination available!");
+ Console.WriteLine("Using destination: " + destination);
+ }
+ return destination;
+ }
set {
- destination = value;
+ destination = value;
}
}
-
+
}
}