You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2009/06/01 22:14:42 UTC
svn commit: r780810 - in /activemq/activemq-dotnet:
Apache.NMS.ActiveMQ/trunk/ Apache.NMS.ActiveMQ/trunk/src/main/csharp/
Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/
Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multi...
Author: jgomes
Date: Mon Jun 1 20:14:42 2009
New Revision: 780810
URL: http://svn.apache.org/viewvc?rev=780810&view=rev
Log:
Refactored unit tests to support replacement environment variables in the configuration file.
Refactored durable consumer tests.
Expanded URI tests.
Fixed broken URI parameter parsing.
Fixes [AMQNET-150]. (See https://issues.apache.org/activemq/browse/AMQNET-150)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nmsprovider-test.config Mon Jun 1 20:14:42 2009
@@ -16,11 +16,11 @@
* limitations under the License.
-->
<configuration>
- <defaultURI value="activemq:tcp://activemqhost:61616"/>
+ <defaultURI value="activemq:tcp://${activemqhost}:61616?connection.AsyncClose=false"/>
- <maxInactivityDurationURI value="activemq:tcp://activemqhost:61616?wireFormat.MaxInactivityDuration=30000"/>
+ <maxInactivityDurationURI value="activemq:tcp://${activemqhost}:61616?wireFormat.MaxInactivityDuration=30000&connection.AsyncClose=false"/>
- <openWireURI value="activemq:tcp://activemqhost:61616">
+ <openWireURI value="activemq:tcp://${activemqhost}:61616?connection.AsyncClose=false">
<factoryParams>
<param type="string" value="OpenWireTestClient"/>
</factoryParams>
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Mon Jun 1 20:14:42 2009
@@ -212,8 +212,12 @@
}
sessions.Clear();
- DisposeOf(ConnectionId);
- transport.Oneway(new ShutdownInfo());
+ if(connected)
+ {
+ DisposeOf(ConnectionId);
+ transport.Oneway(new ShutdownInfo());
+ }
+
transport.Dispose();
}
catch(Exception ex)
@@ -224,6 +228,7 @@
{
this.transport = null;
this.closed = true;
+ this.connected = false;
this.closing = false;
}
}
@@ -352,6 +357,7 @@
command.ObjectId = objectId;
if(asyncClose)
{
+ Tracer.Info("Asynchronously closing Connection.");
OneWay(command);
}
else
@@ -361,6 +367,7 @@
// the broker can dispose of the object. Allow up to 5 seconds to process.
try
{
+ Tracer.Info("Synchronously closing Connection...");
SyncRequest(command, TimeSpan.FromSeconds(5));
}
catch // (BrokerException)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Mon Jun 1 20:14:42 2009
@@ -27,7 +27,7 @@
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
- public const string DEFAULT_BROKER_URL = "activemq:tcp://localhost:61616";
+ public const string DEFAULT_BROKER_URL = "tcp://localhost:61616";
public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
private static event ExceptionListener onException;
@@ -83,18 +83,15 @@
public IConnection CreateConnection(string userName, string password)
{
- Uri uri = brokerUri;
- string scheme = brokerUri.Scheme;
+ return CreateConnection(userName, password, true);
+ }
- if(null != scheme)
- {
- // Do we need to strip off the activemq prefix??
- scheme = scheme.ToLower();
- if("activemq".Equals(scheme))
- {
- uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
- }
- }
+ public IConnection CreateConnection(string userName, string password, bool startTransport)
+ {
+ // Strip off the activemq prefix, if it exists.
+ Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "activemq:"));
+
+ Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
ConnectionInfo info = CreateConnectionInfo(userName, password);
ITransport transport = TransportFactory.CreateTransport(uri);
@@ -104,10 +101,14 @@
// Since this could be a composite Uri, assume the connection-specific parameters
// are associated with the outer-most specification of the composite Uri. What's nice
// is that this works with simple Uri as well.
- URISupport.CompositeData c = URISupport.parseComposite(brokerUri);
+ URISupport.CompositeData c = URISupport.parseComposite(uri);
URISupport.SetProperties(connection, c.Parameters, "connection.");
- connection.ITransport.Start();
+ if(startTransport)
+ {
+ connection.ITransport.Start();
+ }
+
return connection;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs Mon Jun 1 20:14:42 2009
@@ -30,7 +30,7 @@
private static MulticastDiscoveryAgent agent;
private static string currentServiceName;
private static readonly object uriLock = new object();
- private static readonly AutoResetEvent uriDiscoveredEvent = new AutoResetEvent(false);
+ private static readonly AutoResetEvent discoveredUriEvent = new AutoResetEvent(false);
private static event ExceptionListener OnException;
static DiscoveryTransportFactory()
@@ -64,15 +64,18 @@
}
// This will end the wait in the CreateTransport method.
- uriDiscoveredEvent.Set();
+ discoveredUriEvent.Set();
}
private static void agent_OnServiceRemoved(string brokerName, string serviceName)
{
- if(serviceName == currentServiceName)
+ lock(uriLock)
{
- DiscoveredUri = null;
- DiscoveryTransportFactory.OnException(new Exception("Broker connection is no longer valid."));
+ if(serviceName == currentServiceName)
+ {
+ DiscoveredUri = null;
+ DiscoveryTransportFactory.OnException(new Exception("Broker connection is no longer valid."));
+ }
}
}
@@ -84,19 +87,22 @@
{
agent.Start();
}
+
+ Uri hostUri = DiscoveredUri;
- if(null == DiscoveredUri)
+ if(null == hostUri)
{
// If a new broker is found the agent will fire an event which will result in discoveredUri being set.
- uriDiscoveredEvent.WaitOne(TIMEOUT_IN_SECONDS * 1000, true);
- if(null == DiscoveredUri)
+ discoveredUriEvent.WaitOne(TIMEOUT_IN_SECONDS * 1000, true);
+ hostUri = DiscoveredUri;
+ if(null == hostUri)
{
- throw new NMSConnectionException("Unable to find a connection before the timeout period expired.");
+ throw new NMSConnectionException(String.Format("Unable to find a connection to {0} before the timeout period expired.", location.ToString()));
}
}
TcpTransportFactory tcpTransFactory = new TcpTransportFactory();
- return tcpTransFactory.CreateTransport(new Uri(DiscoveredUri + location.Query));
+ return tcpTransFactory.CreateTransport(new Uri(hostUri + location.Query));
}
public ITransport CompositeConnect(Uri location)
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs Mon Jun 1 20:14:42 2009
@@ -20,6 +20,7 @@
using System.Net;
using System.Net.Sockets;
using System.Threading;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
{
@@ -69,46 +70,43 @@
{
lock(stopstartSemaphore)
{
- if(!isStarted)
+ if(multicastSocket == null)
{
- Tracer.Info("Starting multicast discovery agent worker thread");
- isStarted = true;
+ int numFailedAttempts = 0;
+ int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
- if(multicastSocket == null)
+ Tracer.Info("Connecting to multicast discovery socket.");
+ while(!TryToConnectSocket())
{
- int numFailedAttempts = 0;
- int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
-
- while(!TryToConnectSocket())
+ numFailedAttempts++;
+ if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
{
- numFailedAttempts++;
- if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
- {
- throw new ApplicationException(
- "Could not open the socket in order to discover advertising brokers.");
- }
-
- Thread.Sleep(backoffTime);
- backoffTime *= BACKOFF_MULTIPLIER;
+ throw new ApplicationException(
+ "Could not open the socket in order to discover advertising brokers.");
}
- }
- if(worker == null)
- {
- worker = new Thread(new ThreadStart(worker_DoWork));
- worker.Start();
+ Thread.Sleep(backoffTime);
+ backoffTime *= BACKOFF_MULTIPLIER;
}
}
+
+ if(worker == null)
+ {
+ Tracer.Info("Starting multicast discovery agent worker thread");
+ worker = new Thread(new ThreadStart(worker_DoWork));
+ worker.Start();
+ isStarted = true;
+ }
}
}
public void Stop()
{
- Tracer.Info("Stopping multicast discovery agent worker thread");
Thread localThread = null;
lock(stopstartSemaphore)
{
+ Tracer.Info("Stopping multicast discovery agent worker thread");
localThread = worker;
worker = null;
// Changing the isStarted flag will signal the thread that it needs to shut down.
@@ -141,10 +139,19 @@
multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
multicastSocket.Bind(endPoint);
- IPAddress ip = IPAddress.Parse(discoveryUri.Host);
+ IPAddress ipaddress;
+
+ if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
+ {
+ ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
+ if(null == ipaddress)
+ {
+ throw new NMSConnectionException("Invalid host address.");
+ }
+ }
multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
- new MulticastOption(ip, IPAddress.Any));
+ new MulticastOption(ipaddress, IPAddress.Any));
#if !NETCF
multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
#endif
@@ -173,7 +180,6 @@
// We have to remove all of the null bytes.
receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
ProcessBrokerMessage(receivedInfo);
-
}
catch(SocketException)
{
@@ -331,7 +337,6 @@
{
this.lastHeartBeat = DateTime.Now;
}
-
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Mon Jun 1 20:14:42 2009
@@ -200,7 +200,7 @@
return null;
}
- private static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+ public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
{
#if !NETCF
return IPAddress.TryParse(host, out ipaddress);
@@ -218,7 +218,20 @@
#endif
}
- private static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+ public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+ {
+ IPAddress ipaddress = null;
+ IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+ if(null != hostEntry)
+ {
+ ipaddress = GetIPAddress(hostEntry, addressFamily);
+ }
+
+ return ipaddress;
+ }
+
+ public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
{
if(null != hostEntry)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Mon Jun 1 20:14:42 2009
@@ -18,6 +18,7 @@
using System;
using NUnit.Framework;
using NUnit.Framework.Extensions;
+using Apache.NMS.Test;
namespace Apache.NMS.ActiveMQ.Test
{
@@ -25,34 +26,40 @@
public class NMSConnectionFactoryTest
{
[RowTest]
- [Row("tcp://activemqhost:61616")]
- [Row("activemq:tcp://activemqhost:61616")]
- [Row("activemq:tcp://activemqhost:61616?connection.asyncclose=false")]
- [Row("activemq:(tcp://activemqhost:61616)?connection.asyncclose=false")]
- [Row("activemq:multicast://activemqhost:6155")]
- [Row("activemq:failover://activemqhost:61616")]
- [Row("activemq:failover://(tcp://activemqhost:61616,tcp://activemqhost:61616)")]
+ [Row("tcp://${activemqhost}:61616")]
+ [Row("activemq:tcp://${activemqhost}:61616")]
+ [Row("activemq:tcp://${activemqhost}:61616?connection.asyncclose=false")]
+ [Row("activemq:failover:tcp://${activemqhost}:61616")]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616,tcp://${activemqhost}:61616)")]
+ [Row("activemq:discovery://${activemqhost}:6155")]
- [Row("tcp://activemqhost:61616?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
- [Row("activemq:tcp://activemqhost:61616?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
- [Row("activemq:(tcp://activemqhost:61616)?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
- [Row("activemq:(tcp://activemqhost:61616?connection.InvalidParameter=true)", ExpectedException = typeof(NMSException))]
- [Row("activemq:(tcp://activemqhost:61616,tcp://activemqbackup:61616)?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
- [Row("activemq:(tcp://activemqhost:61616,tcp://activemqbackup:61616?connection.InvalidParameter=true)", ExpectedException = typeof(NMSException))]
+ [Row("tcp://${activemqhost}:61616?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
+ [Row("activemq:tcp://${activemqhost}:61616?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:tcp://${activemqhost}:61616?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616)?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616?connection.InvalidParameter=true)", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616,tcp://${activemqbackuphost}:61616)?connection.InvalidParameter=true", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616?connection.InvalidParameter=true,tcp://${activemqbackuphost}:61616)", ExpectedException = typeof(NMSException))]
+ [Row("activemq:failover:(tcp://${activemqhost}:61616,tcp://${activemqbackuphost}:61616?connection.InvalidParameter=true)", ExpectedException = typeof(NMSException))]
- [Row("ftp://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
- [Row("http://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
- [Row("discovery://activemqhost:6155", ExpectedException = typeof(NMSConnectionException))]
- [Row("sms://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
+ [Row("ftp://${activemqhost}:61616", ExpectedException = typeof(NMSConnectionException))]
+ [Row("http://${activemqhost}:61616", ExpectedException = typeof(NMSConnectionException))]
+ [Row("discovery://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
+ [Row("sms://${activemqhost}:61616", ExpectedException = typeof(NMSConnectionException))]
+ [Row("activemq:multicast://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
- [Row("(tcp://activemqhost:61616,tcp://activemqhost:61616)", ExpectedException = typeof(UriFormatException))]
- [Row("tcp://activemqhost:61616,tcp://activemqhost:61616", ExpectedException = typeof(UriFormatException))]
+ [Row("activemq:(tcp://${activemqhost}:61616)?connection.asyncclose=false", ExpectedException = typeof(UriFormatException))]
+ [Row("(tcp://${activemqhost}:61616,tcp://${activemqhost}:61616)", ExpectedException = typeof(UriFormatException))]
+ [Row("tcp://${activemqhost}:61616,tcp://${activemqhost}:61616", ExpectedException = typeof(UriFormatException))]
public void TestURI(string connectionURI)
{
- NMSConnectionFactory factory = new NMSConnectionFactory(connectionURI);
+ NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(connectionURI));
Assert.IsNotNull(factory);
- Assert.IsNotNull(factory.ConnectionFactory);
- Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+ Apache.NMS.ActiveMQ.ConnectionFactory activemqFactory = factory.ConnectionFactory as Apache.NMS.ActiveMQ.ConnectionFactory;
+ Assert.IsNotNull(activemqFactory);
+ using(IConnection connection = activemqFactory.CreateConnection("", "", false))
+ {
+ }
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs Mon Jun 1 20:14:42 2009
@@ -305,11 +305,6 @@
ssp = stripPrefix(ssp, rc.Scheme).Trim();
ssp = stripPrefix(ssp, ":").Trim();
}
- else
- {
- // Fake a composite URL with parenthesis
- ssp = "(" + ssp + ")";
- }
// Handle the composite components
parseComposite(uri, rc, ssp);
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/ConsumerTest.cs Mon Jun 1 20:14:42 2009
@@ -26,76 +26,16 @@
[TestFixture]
public class ConsumerTest : NMSTestSupport
{
- protected static string TEST_CLIENT_ID = "ConsumerTestClientId";
- protected static string TOPIC = "TestTopicConsumerTest";
- protected static string CONSUMER_ID = "ConsumerTestConsumerId";
-
-#if !NET_1_1
- [RowTest]
- [Row(MsgDeliveryMode.Persistent)]
- [Row(MsgDeliveryMode.NonPersistent)]
-#endif
- public void TestDurableConsumerSelectorChange(MsgDeliveryMode deliveryMode)
- {
- try
- {
- using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
- {
- connection.Start();
- using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
- {
- ITopic topic = SessionUtil.GetTopic(session, TOPIC);
- IMessageProducer producer = session.CreateProducer(topic);
- IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='red'", false);
-
- producer.DeliveryMode = deliveryMode;
-
- // Send the messages
- ITextMessage sendMessage = session.CreateTextMessage("1st");
- sendMessage.Properties["color"] = "red";
- producer.Send(sendMessage);
-
- ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
- Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
- Assert.AreEqual("1st", receiveMsg.Text);
- Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
-
- // Change the subscription.
- consumer.Dispose();
- consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='blue'", false);
-
- sendMessage = session.CreateTextMessage("2nd");
- sendMessage.Properties["color"] = "red";
- producer.Send(sendMessage);
- sendMessage = session.CreateTextMessage("3rd");
- sendMessage.Properties["color"] = "blue";
- producer.Send(sendMessage);
-
- // Selector should skip the 2nd message.
- receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
- Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
- Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
- Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
-
- // Make sure there are no pending messages.
- Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
- }
- }
- }
- catch(Exception ex)
- {
- Assert.Fail(ex.Message);
- }
- finally
- {
- UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
- }
- }
+ protected static string TEST_CLIENT_ID = "TestConsumerClientId";
// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
#if !NETCF
- [Test]
- public void TestNoTimeoutConsumer()
+ [RowTest]
+ [Row(AcknowledgementMode.AutoAcknowledge)]
+ [Row(AcknowledgementMode.ClientAcknowledge)]
+ [Row(AcknowledgementMode.DupsOkAcknowledge)]
+ [Row(AcknowledgementMode.Transactional)]
+ public void TestNoTimeoutConsumer(AcknowledgementMode ackMode)
{
// Launch a thread to perform IMessageConsumer.Receive().
// If it doesn't fail in less than three seconds, no exception was thrown.
@@ -103,7 +43,7 @@
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
- using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession session = connection.CreateSession(ackMode))
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using(this.timeoutConsumer = session.CreateConsumer(queue))
@@ -147,8 +87,12 @@
}
}
- [Test]
- public void TestSyncReceiveConsumerClose()
+ [RowTest]
+ [Row(AcknowledgementMode.AutoAcknowledge)]
+ [Row(AcknowledgementMode.ClientAcknowledge)]
+ [Row(AcknowledgementMode.DupsOkAcknowledge)]
+ [Row(AcknowledgementMode.Transactional)]
+ public void TestSyncReceiveConsumerClose(AcknowledgementMode ackMode)
{
// Launch a thread to perform IMessageConsumer.Receive().
// If it doesn't fail in less than three seconds, no exception was thrown.
@@ -156,7 +100,7 @@
using (IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
- using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (ISession session = connection.CreateSession(ackMode))
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using (this.timeoutConsumer = session.CreateConsumer(queue))
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/DurableTest.cs Mon Jun 1 20:14:42 2009
@@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using NUnit.Framework;
+using NUnit.Framework.Extensions;
using Apache.NMS.Util;
namespace Apache.NMS.Test
@@ -23,74 +25,98 @@
[TestFixture]
public class DurableTest : NMSTestSupport
{
- protected static string TOPIC = "TestTopicDurableConsumer";
- protected static string SEND_CLIENT_ID = "SendDurableTestClientId";
- protected static string TEST_CLIENT_ID = "DurableTestClientId";
- protected static string CONSUMER_ID = "DurableTestConsumerId";
+ protected static string TEST_CLIENT_ID = "TestDurableConsumerClientId";
+ protected static string SEND_CLIENT_ID = "TestDurableProducerClientId";
+ protected static string DURABLE_TOPIC = "TestDurableConsumerTopic";
+ protected static string CONSUMER_ID = "TestDurableConsumerConsumerId";
protected static string DURABLE_SELECTOR = "2 > 1";
- protected void SendPersistentMessage()
- {
- using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
- {
- connection.Start();
- using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
- {
- ITopic topic = SessionUtil.GetTopic(session, TOPIC);
- using(IMessageProducer producer = session.CreateProducer(topic))
- {
- ITextMessage message = session.CreateTextMessage("Persistent Hello");
-
- producer.DeliveryMode = MsgDeliveryMode.Persistent;
- producer.RequestTimeout = receiveTimeout;
- producer.Send(message);
- }
- }
- }
- }
-
- [Test]
- public void TestDurableConsumer()
+#if !NET_1_1
+ [RowTest]
+ [Row(MsgDeliveryMode.Persistent)]
+ [Row(MsgDeliveryMode.NonPersistent)]
+#endif
+ public void TestDurableConsumerSelectorChange(MsgDeliveryMode deliveryMode)
{
try
{
- RegisterDurableConsumer(TEST_CLIENT_ID, TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
- SendPersistentMessage();
-
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
- using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
- ITopic topic = SessionUtil.GetTopic(session, TOPIC);
- using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, DURABLE_SELECTOR, false))
- {
- IMessage msg = consumer.Receive(receiveTimeout);
- Assert.IsNotNull(msg, "Did not receive first durable message.");
- msg.Acknowledge();
-
- SendPersistentMessage();
- msg = consumer.Receive(receiveTimeout);
- Assert.IsNotNull(msg, "Did not receive second durable message.");
- msg.Acknowledge();
- }
+ ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
+ IMessageProducer producer = session.CreateProducer(topic);
+ IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='red'", false);
+
+ producer.DeliveryMode = deliveryMode;
+
+ // Send the messages
+ ITextMessage sendMessage = session.CreateTextMessage("1st");
+ sendMessage.Properties["color"] = "red";
+ producer.Send(sendMessage);
+
+ ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+ Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
+ Assert.AreEqual("1st", receiveMsg.Text);
+ Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+ receiveMsg.Acknowledge();
+
+ // Change the subscription.
+ consumer.Dispose();
+ consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='blue'", false);
+
+ sendMessage = session.CreateTextMessage("2nd");
+ sendMessage.Properties["color"] = "red";
+ producer.Send(sendMessage);
+ sendMessage = session.CreateTextMessage("3rd");
+ sendMessage.Properties["color"] = "blue";
+ producer.Send(sendMessage);
+
+ // Selector should skip the 2nd message.
+ receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+ Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
+ Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
+ Assert.AreEqual(deliveryMode, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+ receiveMsg.Acknowledge();
+
+ // Make sure there are no pending messages.
+ Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
}
}
}
+ catch(Exception ex)
+ {
+ Assert.Fail(ex.Message);
+ }
finally
{
UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
}
}
- [Test]
- public void TestDurableConsumerTransactional()
+#if !NET_1_1
+ [RowTest]
+ [Row(MsgDeliveryMode.Persistent, AcknowledgementMode.AutoAcknowledge)]
+ [Row(MsgDeliveryMode.Persistent, AcknowledgementMode.ClientAcknowledge)]
+ [Row(MsgDeliveryMode.Persistent, AcknowledgementMode.DupsOkAcknowledge)]
+ [Row(MsgDeliveryMode.Persistent, AcknowledgementMode.Transactional)]
+
+ [Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.AutoAcknowledge)]
+ [Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.ClientAcknowledge)]
+ [Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.DupsOkAcknowledge)]
+ [Row(MsgDeliveryMode.NonPersistent, AcknowledgementMode.Transactional)]
+#endif
+ public void TestDurableConsumer(MsgDeliveryMode deliveryMode, AcknowledgementMode ackMode)
{
try
{
- RegisterDurableConsumer(TEST_CLIENT_ID, TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
- RunTestDurableConsumerTransactional();
- RunTestDurableConsumerTransactional();
+ RegisterDurableConsumer(TEST_CLIENT_ID, DURABLE_TOPIC, CONSUMER_ID, DURABLE_SELECTOR, false);
+ RunTestDurableConsumer(deliveryMode, ackMode);
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ RunTestDurableConsumer(deliveryMode, ackMode);
+ }
}
finally
{
@@ -98,27 +124,51 @@
}
}
- protected void RunTestDurableConsumerTransactional()
+ protected void RunTestDurableConsumer(MsgDeliveryMode deliveryMode, AcknowledgementMode ackMode)
{
- SendPersistentMessage();
+ SendDurableMessage(deliveryMode);
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
{
- ITopic topic = SessionUtil.GetTopic(session, TOPIC);
+ ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, DURABLE_SELECTOR, false))
{
IMessage msg = consumer.Receive(receiveTimeout);
- Assert.IsNotNull(msg, "Did not receive first durable transactional message.");
+ Assert.IsNotNull(msg, "Did not receive first durable message.");
msg.Acknowledge();
- SendPersistentMessage();
+ SendDurableMessage(deliveryMode);
msg = consumer.Receive(receiveTimeout);
- Assert.IsNotNull(msg, "Did not receive second durable transactional message.");
+ Assert.IsNotNull(msg, "Did not receive second durable message.");
msg.Acknowledge();
- session.Commit();
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+ }
+ }
+ }
+
+ protected void SendDurableMessage(MsgDeliveryMode deliveryMode)
+ {
+ using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(AcknowledgementMode.DupsOkAcknowledge))
+ {
+ ITopic topic = SessionUtil.GetTopic(session, DURABLE_TOPIC);
+ using(IMessageProducer producer = session.CreateProducer(topic))
+ {
+ ITextMessage message = session.CreateTextMessage("Durable Hello");
+
+ producer.DeliveryMode = deliveryMode;
+ producer.RequestTimeout = receiveTimeout;
+ producer.Send(message);
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs?rev=780810&r1=780809&r2=780810&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/NMSTestSupport.cs Mon Jun 1 20:14:42 2009
@@ -21,6 +21,7 @@
using System.IO;
using System.Xml;
using System.Collections;
+using System.Text.RegularExpressions;
namespace Apache.NMS.Test
{
@@ -113,11 +114,12 @@
if(null != uriNode)
{
- brokerUri = new Uri(uriNode.GetAttribute("value"));
+ // Replace any environment variables embedded inside the string.
+ brokerUri = new Uri(ReplaceEnvVar(uriNode.GetAttribute("value")));
factoryParams = GetFactoryParams(uriNode);
- clientId = GetNodeValueAttribute(uriNode, "clientId", "NMSTestClientId");
- userName = GetNodeValueAttribute(uriNode, "userName", "guest");
- passWord = GetNodeValueAttribute(uriNode, "passWord", "guest");
+ clientId = ReplaceEnvVar(GetNodeValueAttribute(uriNode, "clientId", "NMSTestClientId"));
+ userName = ReplaceEnvVar(GetNodeValueAttribute(uriNode, "userName", "guest"));
+ passWord = ReplaceEnvVar(GetNodeValueAttribute(uriNode, "passWord", "guest"));
if(null == factoryParams)
{
@@ -151,7 +153,7 @@
foreach(XmlElement paramNode in nodeList)
{
string paramType = paramNode.GetAttribute("type");
- string paramValue = paramNode.GetAttribute("value");
+ string paramValue = ReplaceEnvVar(paramNode.GetAttribute("value"));
switch(paramType)
{
@@ -177,6 +179,13 @@
return null;
}
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="parentNode"></param>
+ /// <param name="nodeName"></param>
+ /// <param name="dflt"></param>
+ /// <returns></returns>
protected static string GetNodeValueAttribute(XmlElement parentNode, string nodeName, string dflt)
{
XmlElement node = (XmlElement) parentNode.SelectSingleNode(nodeName);
@@ -195,6 +204,42 @@
}
/// <summary>
+ /// Replace embedded variable markups with environment variable values.
+ /// Variable markups are of the following form:
+ /// ${varname}
+ /// </summary>
+ /// <param name="srcText"></param>
+ /// <returns></returns>
+ public static string ReplaceEnvVar(string srcText)
+ {
+ // TODO: This should be refactored to be more generic and support full variable
+ // names that can be pulled from the environment. Currently, we only support limited
+ // hard-coded variable names:
+ //
+ // "${activemqhost}" - defaults to "localhost".
+ // "${activemqbackuphost}" - defaults to "localhost".
+
+ srcText = ReplaceEnvVar(srcText, "ActiveMQHost", "localhost");
+ srcText = ReplaceEnvVar(srcText, "ActiveMQBackupHost", "localhost");
+ return srcText;
+ }
+
+ public static string ReplaceEnvVar(string srcText, string varName, string defaultValue)
+ {
+#if (PocketPC||NETCF||NETCF_2_0)
+ string replacementValue = null;
+#else
+ string replacementValue = Environment.GetEnvironmentVariable(varName);
+#endif
+ if(null == replacementValue)
+ {
+ replacementValue = defaultValue;
+ }
+
+ return Regex.Replace(srcText, "\\${" + varName + "}", replacementValue, RegexOptions.IgnoreCase);
+ }
+
+ /// <summary>
/// Create a new connection to the broker.
/// </summary>
/// <returns></returns>