You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/07 23:31:31 UTC
svn commit: r897036 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk:
./ src/main/csharp/ src/test/csharp/ src/test/csharp/Threads/
Author: tabish
Date: Thu Jan 7 22:31:30 2010
New Revision: 897036
URL: http://svn.apache.org/viewvc?rev=897036&view=rev
Log:
Adds async Acks option to Connection and a SendAcks method in Session. MessageConsumer modified to call SendAcks. Refactored the ConnectionFactory and Connection to make them more like the ActiveMQ versions.
Adds Tests for ConnectionFactory through the NMSConnectionFactory and a test case for the DedicatedTaskRunner.
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Thu Jan 7 22:31:30 2010
@@ -20,6 +20,7 @@
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Transport;
+using Apache.NMS.Stomp.Util;
using Apache.NMS;
using Apache.NMS.Util;
@@ -30,17 +31,25 @@
/// </summary>
public class Connection : IConnection
{
+ private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
private readonly Uri brokerUri;
private ITransport transport;
private readonly ConnectionInfo info;
+
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ private bool asyncSend = false;
+ private bool alwaysSyncSend = false;
+ private bool copyMessageOnSend = true;
+ private bool sendAcksAsync = false;
+ private IRedeliveryPolicy redeliveryPolicy;
+ private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+ private bool userSpecifiedClientID;
private TimeSpan requestTimeout;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
- private bool asyncSend = false;
- private bool alwaysSyncSend = false;
- private bool copyMessageOnSend = true;
private bool connected = false;
private bool closed = false;
private bool closing = false;
@@ -50,19 +59,25 @@
private readonly Atomic<bool> started = new Atomic<bool>(false);
private ConnectionMetaData metaData = null;
private bool disposed = false;
- private IRedeliveryPolicy redeliveryPolicy;
- private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+ private IdGenerator clientIdGenerator;
- public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
+ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
this.brokerUri = connectionUri;
- this.info = info;
this.requestTimeout = transport.RequestTimeout;
+ this.clientIdGenerator = clientIdGenerator;
+
this.transport = transport;
this.transport.Command = new CommandHandler(OnCommand);
this.transport.Exception = new ExceptionHandler(OnException);
this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+ ConnectionId id = new ConnectionId();
+ id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+ this.info = new ConnectionInfo();
+ this.info.ConnectionId = id;
}
~Connection()
@@ -89,6 +104,18 @@
#region Properties
+ public String UserName
+ {
+ get { return this.info.UserName; }
+ set { this.info.UserName = value; }
+ }
+
+ public String Password
+ {
+ get { return this.info.Password; }
+ set { this.info.Password = value; }
+ }
+
/// <summary>
/// This property indicates whether or not async send is enabled.
/// </summary>
@@ -134,6 +161,17 @@
set { copyMessageOnSend = value; }
}
+ /// <summary>
+ /// This property indicates whether or not async sends are used for
+ /// message acknowledgement messages. Sending Acks async can improve
+ /// performance but may decrease reliability.
+ /// </summary>
+ public bool SendAcksAsync
+ {
+ get { return sendAcksAsync; }
+ set { sendAcksAsync = value; }
+ }
+
public IConnectionMetaData MetaData
{
get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
@@ -167,11 +205,26 @@
get { return info.ClientId; }
set
{
- if(connected)
+ if(this.connected)
{
throw new NMSException("You cannot change the ClientId once the Connection is connected");
}
- info.ClientId = value;
+
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ CheckConnected();
+ }
+ }
+
+ /// <summary>
+ /// The Default Client Id used if the ClientId property is not set explicity.
+ /// </summary>
+ public string DefaultClientId
+ {
+ set
+ {
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
}
}
@@ -432,6 +485,11 @@
if(!connected)
{
+ if(!this.userSpecifiedClientID)
+ {
+ this.info.ClientId = this.clientIdGenerator.GenerateId();
+ }
+
connected = true;
// now lets send the connection and see if we get an ack/nak
if(null == SyncRequest(info))
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Thu Jan 7 22:31:30 2010
@@ -16,6 +16,7 @@
*/
using System;
+using Apache.NMS.Stomp.Util;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Stomp.Transport;
using Apache.NMS;
@@ -30,13 +31,21 @@
public class ConnectionFactory : IConnectionFactory
{
public const string DEFAULT_BROKER_URL = "tcp://localhost:61613";
- public const string ENV_BROKER_URL = "STOMP_BROKER_URL";
+ public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
private static event ExceptionListener onException;
private Uri brokerUri;
private string connectionUserName;
private string connectionPassword;
private string clientId;
+ private string clientIdPrefix;
+ private IdGenerator clientIdGenerator;
+
+ private bool copyMessageOnSend = true;
+ private bool asyncSend;
+ private bool alwaysSyncSend;
+ private bool sendAcksAsync=true;
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -77,8 +86,8 @@
public ConnectionFactory(Uri brokerUri, string clientID)
{
- this.brokerUri = brokerUri;
- this.clientId = clientID;
+ this.BrokerUri = brokerUri;
+ this.ClientId = clientID;
}
public IConnection CreateConnection()
@@ -88,38 +97,60 @@
public IConnection CreateConnection(string userName, string password)
{
- // Strip off the activemq prefix, if it exists.
- Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
+ Connection connection = null;
- Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
+ try
+ {
+ // Strip off the activemq prefix, if it exists.
+ Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
- ConnectionInfo info = CreateConnectionInfo(userName, password);
- ITransport transport = TransportFactory.CreateTransport(uri);
- Connection connection = new Connection(uri, transport, info);
+ Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
- // Set the Factory level configuration to the Connection, this can be overriden by
- // the params on the Connection URI so we do this before applying the params.
- connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
- connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+ ITransport transport = TransportFactory.CreateTransport(uri);
- // Set properties on connection using parameters prefixed with "connection."
- // Since this could be a composite Uri, assume the connection-specific parameters
- // are associated with the outer-most specification of the composite Uri. What's nice
- // is that this works with simple Uri as well.
- URISupport.CompositeData c = URISupport.parseComposite(uri);
- URISupport.SetProperties(connection, c.Parameters, "connection.");
+ connection = new Connection(uri, transport, this.ClientIdGenerator);
- // Apply any URI options to the Prefetch policy in the Connection.
- URISupport.SetProperties(connection.PrefetchPolicy, c.Parameters, "nms.PrefetchPolicy.");
+ ConfigureConnection(connection);
- // Apply any URI options to the Redelivery policy in the Connection.
- URISupport.SetProperties(connection.RedeliveryPolicy, c.Parameters, "nms.RedeliveryPolicy.");
+ connection.UserName = this.connectionUserName;
+ connection.Password = this.connectionPassword;
- connection.ITransport.Start();
- return connection;
+ if(this.clientId != null)
+ {
+ connection.DefaultClientId = this.clientId;
+ }
+
+ connection.ITransport.Start();
+
+ return connection;
+ }
+ catch(NMSException e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw e;
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ connection.Close();
+ }
+ catch
+ {
+ }
+
+ throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
+ }
}
- // Properties
+ #region ConnectionFactory Properties
/// <summary>
/// Get/or set the broker Uri.
@@ -127,7 +158,17 @@
public Uri BrokerUri
{
get { return brokerUri; }
- set { brokerUri = value; }
+ set
+ {
+ brokerUri = value;
+
+ Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
+
+ URISupport.CompositeData c = URISupport.parseComposite(uri);
+ URISupport.SetProperties(this, c.Parameters, "connection.");
+ URISupport.SetProperties(this.PrefetchPolicy, c.Parameters, "nms.PrefetchPolicy.");
+ URISupport.SetProperties(this.RedeliveryPolicy, c.Parameters, "nms.RedeliveryPolicy.");
+ }
}
public string UserName
@@ -148,6 +189,53 @@
set { clientId = value; }
}
+ public string ClientIdPrefix
+ {
+ get { return clientIdPrefix; }
+ set { clientIdPrefix = value; }
+ }
+
+ public bool CopyMessageOnSend
+ {
+ get { return copyMessageOnSend; }
+ set { copyMessageOnSend = value; }
+ }
+
+ public bool AlwaysSyncSend
+ {
+ get { return alwaysSyncSend; }
+ set { alwaysSyncSend = value; }
+ }
+
+ public bool SendAcksAsync
+ {
+ get { return sendAcksAsync; }
+ set { sendAcksAsync = value; }
+ }
+
+ public bool AsyncSend
+ {
+ get { return asyncSend; }
+ set { asyncSend = value; }
+ }
+
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ public PrefetchPolicy PrefetchPolicy
+ {
+ get { return this.prefetchPolicy; }
+ set { this.prefetchPolicy = value; }
+ }
+
public IRedeliveryPolicy RedeliveryPolicy
{
get { return this.redeliveryPolicy; }
@@ -160,6 +248,30 @@
}
}
+ public IdGenerator ClientIdGenerator
+ {
+ set { this.clientIdGenerator = value; }
+ get
+ {
+ lock(this)
+ {
+ if(this.clientIdGenerator == null)
+ {
+ if(this.clientIdPrefix != null)
+ {
+ this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+ }
+ else
+ {
+ this.clientIdGenerator = new IdGenerator();
+ }
+ }
+
+ return this.clientIdGenerator;
+ }
+ }
+ }
+
public event ExceptionListener OnException
{
add { onException += value; }
@@ -172,23 +284,17 @@
}
}
- protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
- {
- ConnectionInfo answer = new ConnectionInfo();
- ConnectionId connectionId = new ConnectionId();
- connectionId.Value = CreateNewGuid();
-
- answer.ConnectionId = connectionId;
- answer.UserName = userName;
- answer.Password = password;
- answer.ClientId = clientId ?? CreateNewGuid();
+ #endregion
- return answer;
- }
-
- protected static string CreateNewGuid()
+ protected virtual void ConfigureConnection(Connection connection)
{
- return Guid.NewGuid().ToString();
+ connection.AsyncSend = this.AsyncSend;
+ connection.CopyMessageOnSend = this.CopyMessageOnSend;
+ connection.AlwaysSyncSend = this.AlwaysSyncSend;
+ connection.SendAcksAsync = this.SendAcksAsync;
+ connection.AcknowledgementMode = this.acknowledgementMode;
+ connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+ connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
}
protected static void ExceptionHandler(Exception ex)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Thu Jan 7 22:31:30 2010
@@ -282,7 +282,7 @@
ack.MessageCount = 1;
Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
- this.session.Connection.Oneway(ack);
+ this.session.SendAck(ack);
}
protected void DoNothingAcknowledge(Message message)
@@ -344,7 +344,7 @@
try
{
- this.session.Connection.Oneway(ackToSend);
+ this.session.SendAck(ackToSend);
}
catch(Exception e)
{
@@ -599,8 +599,8 @@
ack.Destination = dispatch.Destination;
ack.LastMessageId = dispatch.Message.MessageId;
ack.MessageCount = 1;
-
- this.session.Connection.Oneway(ack);
+
+ this.session.SendAck(ack);
}
this.deliveringAcks.Value = false;
@@ -678,7 +678,7 @@
if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
{
- this.session.Connection.Oneway(pendingAck);
+ this.session.SendAck(pendingAck);
this.pendingAck = null;
this.deliveredCounter = 0;
this.additionalWindowSize = 0;
@@ -703,7 +703,7 @@
ack.TransactionId = this.session.TransactionContext.TransactionId;
}
- this.session.Connection.Oneway(ack);
+ this.session.SendAck(ack);
this.pendingAck = null;
// Adjust the counters
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Thu Jan 7 22:31:30 2010
@@ -788,6 +788,23 @@
return message;
}
+ internal void SendAck(MessageAck ack)
+ {
+ this.SendAck(ack, false);
+ }
+
+ internal void SendAck(MessageAck ack, bool lazy)
+ {
+ if(lazy || connection.SendAcksAsync || this.IsTransacted )
+ {
+ this.connection.Oneway(ack);
+ }
+ else
+ {
+ this.connection.SyncRequest(ack);
+ }
+ }
+
/// <summary>
/// Prevents message from throwing an exception if a client calls Acknoweldge on
/// a message that is part of a transaction either being produced or consumed. The
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=897036&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Thu Jan 7 22:31:30 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net.Sockets;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+ [TestFixture]
+ public class NMSConnectionFactoryTest
+ {
+ [RowTest]
+ [Row("stomp:tcp://${activemqhost}:61613")]
+ [Row("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
+ [Row("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:tcp://InvalidHost:61613?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+ [Row("ftp://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Row("http://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Row("discovery://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
+ [Row("sms://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:multicast://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
+ [Row("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
+
+ [Row("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)", ExpectedException = typeof(UriFormatException))]
+ [Row("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613", ExpectedException = typeof(UriFormatException))]
+ public void TestURI(string connectionURI)
+ {
+ NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(connectionURI));
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ using(IConnection connection = factory.CreateConnection("", ""))
+ {
+ Assert.IsNotNull(connection);
+ }
+ }
+
+ [Test]
+ public void TestURIForPrefetchHandling()
+ {
+ string uri1 = "stomp:tcp://${activemqhost}:61613" +
+ "?nms.PrefetchPolicy.queuePrefetch=1" +
+ "&nms.PrefetchPolicy.queueBrowserPrefetch=2" +
+ "&nms.PrefetchPolicy.topicPrefetch=3" +
+ "&nms.PrefetchPolicy.durableTopicPrefetch=4" +
+ "&nms.PrefetchPolicy.maximumPendingMessageLimit=5";
+
+ string uri2 = "stomp:tcp://${activemqhost}:61613" +
+ "?nms.PrefetchPolicy.queuePrefetch=112" +
+ "&nms.PrefetchPolicy.queueBrowserPrefetch=212" +
+ "&nms.PrefetchPolicy.topicPrefetch=312" +
+ "&nms.PrefetchPolicy.durableTopicPrefetch=412" +
+ "&nms.PrefetchPolicy.maximumPendingMessageLimit=512";
+
+ NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1));
+
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ using(IConnection connection = factory.CreateConnection("", ""))
+ {
+ Assert.IsNotNull(connection);
+
+ Connection amqConnection = connection as Connection;
+ Assert.AreEqual(1, amqConnection.PrefetchPolicy.QueuePrefetch);
+ Assert.AreEqual(2, amqConnection.PrefetchPolicy.QueueBrowserPrefetch);
+ Assert.AreEqual(3, amqConnection.PrefetchPolicy.TopicPrefetch);
+ Assert.AreEqual(4, amqConnection.PrefetchPolicy.DurableTopicPrefetch);
+ Assert.AreEqual(5, amqConnection.PrefetchPolicy.MaximumPendingMessageLimit);
+ }
+
+ factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2));
+
+ Assert.IsNotNull(factory);
+ Assert.IsNotNull(factory.ConnectionFactory);
+ using(IConnection connection = factory.CreateConnection("", ""))
+ {
+ Assert.IsNotNull(connection);
+
+ Connection amqConnection = connection as Connection;
+ Assert.AreEqual(112, amqConnection.PrefetchPolicy.QueuePrefetch);
+ Assert.AreEqual(212, amqConnection.PrefetchPolicy.QueueBrowserPrefetch);
+ Assert.AreEqual(312, amqConnection.PrefetchPolicy.TopicPrefetch);
+ Assert.AreEqual(412, amqConnection.PrefetchPolicy.DurableTopicPrefetch);
+ Assert.AreEqual(512, amqConnection.PrefetchPolicy.MaximumPendingMessageLimit);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs?rev=897036&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs Thu Jan 7 22:31:30 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+using Apache.NMS.Stomp.Threads;
+
+namespace Apache.NMS.Stomp.Test.Threads
+{
+ [TestFixture]
+ public class DedicatedTaskRunnerTest
+ {
+ class SimpleCountingTask : Task
+ {
+ private uint count;
+
+ public SimpleCountingTask()
+ {
+ this.count = 0;
+ }
+
+ public bool Iterate()
+ {
+ count++;
+ return false;
+ }
+
+ public uint Count
+ {
+ get{ return count; }
+ }
+ }
+
+ class InfiniteCountingTask : Task
+ {
+ private uint count;
+
+ public InfiniteCountingTask()
+ {
+ this.count = 0;
+ }
+
+ public bool Iterate()
+ {
+ count++;
+ return true;
+ }
+
+ public uint Count
+ {
+ get{ return count; }
+ }
+ }
+
+ [Test]
+ public void TestSimple()
+ {
+ try
+ {
+ new DedicatedTaskRunner(null);
+ Assert.Fail("Should throw a NullReferenceException");
+ }
+ catch
+ {
+ }
+
+ SimpleCountingTask simpleTask = new SimpleCountingTask();
+ Assert.IsTrue( simpleTask.Count == 0 );
+ DedicatedTaskRunner simpleTaskRunner = new DedicatedTaskRunner(simpleTask);
+
+ simpleTaskRunner.Wakeup();
+ Thread.Sleep( 500 );
+ Assert.IsTrue( simpleTask.Count >= 1 );
+ simpleTaskRunner.Wakeup();
+ Thread.Sleep( 500 );
+ Assert.IsTrue( simpleTask.Count >= 2 );
+
+ InfiniteCountingTask infiniteTask = new InfiniteCountingTask();
+ Assert.IsTrue( infiniteTask.Count == 0 );
+ DedicatedTaskRunner infiniteTaskRunner = new DedicatedTaskRunner(infiniteTask);
+ Thread.Sleep( 500 );
+ Assert.IsTrue( infiniteTask.Count != 0 );
+ infiniteTaskRunner.Shutdown();
+ uint count = infiniteTask.Count;
+ Thread.Sleep( 500 );
+ Assert.IsTrue( infiniteTask.Count == count );
+
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Thu Jan 7 22:31:30 2010
@@ -98,5 +98,7 @@
<Compile Include="src\test\csharp\Commands\CommandTest.cs" />
<Compile Include="src\test\csharp\Commands\MessageTest.cs" />
<Compile Include="src\test\csharp\Commands\TextMessageTest.cs" />
+ <Compile Include="src\test\csharp\NMSConnectionFactoryTest.cs" />
+ <Compile Include="src\test\csharp\Threads\DedicatedTaskRunnerTest.cs" />
</ItemGroup>
</Project>
\ No newline at end of file