You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/07 23:31:31 UTC

svn commit: r897036 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: ./ src/main/csharp/ src/test/csharp/ src/test/csharp/Threads/

Author: tabish
Date: Thu Jan  7 22:31:30 2010
New Revision: 897036

URL: http://svn.apache.org/viewvc?rev=897036&view=rev
Log:
Adds async Acks option to Connection and a SendAcks method in Session.  MessageConsumer modified to call SendAcks. Refactored the ConnectionFactory and Connection to make them more like the ActiveMQ versions.

Adds Tests for ConnectionFactory through the NMSConnectionFactory and a test case for the DedicatedTaskRunner.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Thu Jan  7 22:31:30 2010
@@ -20,6 +20,7 @@
 using System.Threading;
 using Apache.NMS.Stomp.Commands;
 using Apache.NMS.Stomp.Transport;
+using Apache.NMS.Stomp.Util;
 using Apache.NMS;
 using Apache.NMS.Util;
 
@@ -30,17 +31,25 @@
     /// </summary>
     public class Connection : IConnection
     {
+        private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+
         private readonly Uri brokerUri;
         private ITransport transport;
         private readonly ConnectionInfo info;
+
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+        private bool asyncSend = false;
+        private bool alwaysSyncSend = false;
+        private bool copyMessageOnSend = true;
+        private bool sendAcksAsync = false;
+        private IRedeliveryPolicy redeliveryPolicy;
+        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+
+        private bool userSpecifiedClientID;
         private TimeSpan requestTimeout;
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
-        private bool asyncSend = false;
-        private bool alwaysSyncSend = false;
-        private bool copyMessageOnSend = true;
         private bool connected = false;
         private bool closed = false;
         private bool closing = false;
@@ -50,19 +59,25 @@
         private readonly Atomic<bool> started = new Atomic<bool>(false);
         private ConnectionMetaData metaData = null;
         private bool disposed = false;
-        private IRedeliveryPolicy redeliveryPolicy;
-        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+        private IdGenerator clientIdGenerator;
 
-        public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
+        public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
             this.brokerUri = connectionUri;
-            this.info = info;
             this.requestTimeout = transport.RequestTimeout;
+            this.clientIdGenerator = clientIdGenerator;
+
             this.transport = transport;
             this.transport.Command = new CommandHandler(OnCommand);
             this.transport.Exception = new ExceptionHandler(OnException);
             this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
             this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+
+            ConnectionId id = new ConnectionId();
+            id.Value = CONNECTION_ID_GENERATOR.GenerateId();
+
+            this.info = new ConnectionInfo();
+            this.info.ConnectionId = id;
         }
 
         ~Connection()
@@ -89,6 +104,18 @@
 
         #region Properties
 
+        public String UserName
+        {
+            get { return this.info.UserName; }
+            set { this.info.UserName = value; }
+        }
+
+        public String Password
+        {
+            get { return this.info.Password; }
+            set { this.info.Password = value; }
+        }
+
         /// <summary>
         /// This property indicates whether or not async send is enabled.
         /// </summary>
@@ -134,6 +161,17 @@
             set { copyMessageOnSend = value; }
         }
 
+        /// <summary>
+        /// This property indicates whether or not async sends are used for
+        /// message acknowledgement messages.  Sending Acks async can improve
+        /// performance but may decrease reliability.
+        /// </summary>
+        public bool SendAcksAsync
+        {
+            get { return sendAcksAsync; }
+            set { sendAcksAsync = value; }
+        }
+
         public IConnectionMetaData MetaData
         {
             get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
@@ -167,11 +205,26 @@
             get { return info.ClientId; }
             set
             {
-                if(connected)
+                if(this.connected)
                 {
                     throw new NMSException("You cannot change the ClientId once the Connection is connected");
                 }
-                info.ClientId = value;
+
+                this.info.ClientId = value;
+                this.userSpecifiedClientID = true;
+                CheckConnected();
+            }
+        }
+
+        /// <summary>
+        /// The Default Client Id used if the ClientId property is not set explicity.
+        /// </summary>
+        public string DefaultClientId
+        {
+            set
+            {
+                this.info.ClientId = value;
+                this.userSpecifiedClientID = true;
             }
         }
 
@@ -432,6 +485,11 @@
 
             if(!connected)
             {
+                if(!this.userSpecifiedClientID)
+                {
+                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                }
+
                 connected = true;
                 // now lets send the connection and see if we get an ack/nak
                 if(null == SyncRequest(info))

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Thu Jan  7 22:31:30 2010
@@ -16,6 +16,7 @@
  */
 
 using System;
+using Apache.NMS.Stomp.Util;
 using Apache.NMS.Stomp.Commands;
 using Apache.NMS.Stomp.Transport;
 using Apache.NMS;
@@ -30,13 +31,21 @@
     public class ConnectionFactory : IConnectionFactory
     {
         public const string DEFAULT_BROKER_URL = "tcp://localhost:61613";
-        public const string ENV_BROKER_URL = "STOMP_BROKER_URL";
+        public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
 
         private static event ExceptionListener onException;
         private Uri brokerUri;
         private string connectionUserName;
         private string connectionPassword;
         private string clientId;
+        private string clientIdPrefix;
+        private IdGenerator clientIdGenerator;
+
+        private bool copyMessageOnSend = true;
+        private bool asyncSend;
+        private bool alwaysSyncSend;
+        private bool sendAcksAsync=true;
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -77,8 +86,8 @@
 
         public ConnectionFactory(Uri brokerUri, string clientID)
         {
-            this.brokerUri = brokerUri;
-            this.clientId = clientID;
+            this.BrokerUri = brokerUri;
+            this.ClientId = clientID;
         }
 
         public IConnection CreateConnection()
@@ -88,38 +97,60 @@
 
         public IConnection CreateConnection(string userName, string password)
         {
-            // Strip off the activemq prefix, if it exists.
-            Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
+            Connection connection = null;
 
-            Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
+            try
+            {
+                // Strip off the activemq prefix, if it exists.
+                Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
 
-            ConnectionInfo info = CreateConnectionInfo(userName, password);
-            ITransport transport = TransportFactory.CreateTransport(uri);
-            Connection connection = new Connection(uri, transport, info);
+                Tracer.InfoFormat("Connecting to: {0}", uri.ToString());
 
-            // Set the Factory level configuration to the Connection, this can be overriden by
-            // the params on the Connection URI so we do this before applying the params.
-            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+                ITransport transport = TransportFactory.CreateTransport(uri);
 
-            // Set properties on connection using parameters prefixed with "connection."
-            // Since this could be a composite Uri, assume the connection-specific parameters
-            // are associated with the outer-most specification of the composite Uri. What's nice
-            // is that this works with simple Uri as well.
-            URISupport.CompositeData c = URISupport.parseComposite(uri);
-            URISupport.SetProperties(connection, c.Parameters, "connection.");
+                connection = new Connection(uri, transport, this.ClientIdGenerator);
 
-            // Apply any URI options to the Prefetch policy in the Connection.
-            URISupport.SetProperties(connection.PrefetchPolicy, c.Parameters, "nms.PrefetchPolicy.");
+                ConfigureConnection(connection);
 
-            // Apply any URI options to the Redelivery policy in the Connection.
-            URISupport.SetProperties(connection.RedeliveryPolicy, c.Parameters, "nms.RedeliveryPolicy.");
+                connection.UserName = this.connectionUserName;
+                connection.Password = this.connectionPassword;
 
-            connection.ITransport.Start();
-            return connection;
+                if(this.clientId != null)
+                {
+                    connection.DefaultClientId = this.clientId;
+                }
+
+                connection.ITransport.Start();
+
+                return connection;
+            }
+            catch(NMSException e)
+            {
+                try
+                {
+                    connection.Close();
+                }
+                catch
+                {
+                }
+
+                throw e;
+            }
+            catch(Exception e)
+            {
+                try
+                {
+                    connection.Close();
+                }
+                catch
+                {
+                }
+
+                throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
+            }
         }
 
-        // Properties
+        #region ConnectionFactory Properties
 
         /// <summary>
         /// Get/or set the broker Uri.
@@ -127,7 +158,17 @@
         public Uri BrokerUri
         {
             get { return brokerUri; }
-            set { brokerUri = value; }
+            set
+            {
+                brokerUri = value;
+
+                Uri uri = new Uri(URISupport.stripPrefix(brokerUri.OriginalString, "stomp:"));
+
+                URISupport.CompositeData c = URISupport.parseComposite(uri);
+                URISupport.SetProperties(this, c.Parameters, "connection.");
+                URISupport.SetProperties(this.PrefetchPolicy, c.Parameters, "nms.PrefetchPolicy.");
+                URISupport.SetProperties(this.RedeliveryPolicy, c.Parameters, "nms.RedeliveryPolicy.");
+            }
         }
 
         public string UserName
@@ -148,6 +189,53 @@
             set { clientId = value; }
         }
 
+        public string ClientIdPrefix
+        {
+            get { return clientIdPrefix; }
+            set { clientIdPrefix = value; }
+        }
+
+        public bool CopyMessageOnSend
+        {
+            get { return copyMessageOnSend; }
+            set { copyMessageOnSend = value; }
+        }
+
+        public bool AlwaysSyncSend
+        {
+            get { return alwaysSyncSend; }
+            set { alwaysSyncSend = value; }
+        }
+
+        public bool SendAcksAsync
+        {
+            get { return sendAcksAsync; }
+            set { sendAcksAsync = value; }
+        }
+
+        public bool AsyncSend
+        {
+            get { return asyncSend; }
+            set { asyncSend = value; }
+        }
+
+        public string AckMode
+        {
+            set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { this.acknowledgementMode = value; }
+        }
+
+        public PrefetchPolicy PrefetchPolicy
+        {
+            get { return this.prefetchPolicy; }
+            set { this.prefetchPolicy = value; }
+        }
+
         public IRedeliveryPolicy RedeliveryPolicy
         {
             get { return this.redeliveryPolicy; }
@@ -160,6 +248,30 @@
             }
         }
 
+        public IdGenerator ClientIdGenerator
+        {
+            set { this.clientIdGenerator = value; }
+            get
+            {
+                lock(this)
+                {
+                    if(this.clientIdGenerator == null)
+                    {
+                        if(this.clientIdPrefix != null)
+                        {
+                            this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+                        }
+                        else
+                        {
+                            this.clientIdGenerator = new IdGenerator();
+                        }
+                    }
+
+                    return this.clientIdGenerator;
+                }
+            }
+        }
+
         public event ExceptionListener OnException
         {
             add { onException += value; }
@@ -172,23 +284,17 @@
             }
         }
 
-        protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
-        {
-            ConnectionInfo answer = new ConnectionInfo();
-            ConnectionId connectionId = new ConnectionId();
-            connectionId.Value = CreateNewGuid();
-
-            answer.ConnectionId = connectionId;
-            answer.UserName = userName;
-            answer.Password = password;
-            answer.ClientId = clientId ?? CreateNewGuid();
+        #endregion
 
-            return answer;
-        }
-
-        protected static string CreateNewGuid()
+        protected virtual void ConfigureConnection(Connection connection)
         {
-            return Guid.NewGuid().ToString();
+            connection.AsyncSend = this.AsyncSend;
+            connection.CopyMessageOnSend = this.CopyMessageOnSend;
+            connection.AlwaysSyncSend = this.AlwaysSyncSend;
+            connection.SendAcksAsync = this.SendAcksAsync;
+            connection.AcknowledgementMode = this.acknowledgementMode;
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
         }
 
         protected static void ExceptionHandler(Exception ex)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Thu Jan  7 22:31:30 2010
@@ -282,7 +282,7 @@
             ack.MessageCount = 1;
 
             Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
-            this.session.Connection.Oneway(ack);
+            this.session.SendAck(ack);
         }
 
         protected void DoNothingAcknowledge(Message message)
@@ -344,7 +344,7 @@
 
                     try
                     {
-                        this.session.Connection.Oneway(ackToSend);
+                        this.session.SendAck(ackToSend);
                     }
                     catch(Exception e)
                     {
@@ -599,8 +599,8 @@
 			                ack.Destination = dispatch.Destination;
 			                ack.LastMessageId = dispatch.Message.MessageId;
 			                ack.MessageCount = 1;
-			
-                            this.session.Connection.Oneway(ack);
+
+                            this.session.SendAck(ack);
                         }
 						
                         this.deliveringAcks.Value = false;
@@ -678,7 +678,7 @@
 
             if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
             {
-                this.session.Connection.Oneway(pendingAck);
+                this.session.SendAck(pendingAck);
                 this.pendingAck = null;
                 this.deliveredCounter = 0;
                 this.additionalWindowSize = 0;
@@ -703,7 +703,7 @@
                     ack.TransactionId = this.session.TransactionContext.TransactionId;
                 }
 
-                this.session.Connection.Oneway(ack);
+                this.session.SendAck(ack);
                 this.pendingAck = null;
 
                 // Adjust the counters

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Thu Jan  7 22:31:30 2010
@@ -788,6 +788,23 @@
             return message;
         }
 
+        internal void SendAck(MessageAck ack)
+        {
+            this.SendAck(ack, false);
+        }
+
+        internal void SendAck(MessageAck ack, bool lazy)
+        {
+            if(lazy || connection.SendAcksAsync || this.IsTransacted )
+            {
+                this.connection.Oneway(ack);
+            }
+            else
+            {
+                this.connection.SyncRequest(ack);
+            }
+        }
+        
         /// <summary>
         /// Prevents message from throwing an exception if a client calls Acknoweldge on
         /// a message that is part of a transaction either being produced or consumed.  The

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=897036&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Thu Jan  7 22:31:30 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Net.Sockets;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+    [TestFixture]
+    public class NMSConnectionFactoryTest
+    {
+        [RowTest]
+        [Row("stomp:tcp://${activemqhost}:61613")]
+        [Row("stomp:tcp://${activemqhost}:61613?connection.asyncsend=false")]
+        [Row("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:tcp://InvalidHost:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:tcp://InvalidHost:61613?connection.asyncsend=false", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:tcp://${activemqhost}:61613?connection.InvalidParameter=true", ExpectedException = typeof(NMSConnectionException))]
+        [Row("ftp://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Row("http://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Row("discovery://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
+        [Row("sms://${activemqhost}:61613", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:multicast://${activemqhost}:6155", ExpectedException = typeof(NMSConnectionException))]
+        [Row("stomp:(tcp://${activemqhost}:61613)?connection.asyncSend=false", ExpectedException = typeof(NMSConnectionException))]
+
+        [Row("(tcp://${activemqhost}:61613,tcp://${activemqhost}:61613)", ExpectedException = typeof(UriFormatException))]
+        [Row("tcp://${activemqhost}:61613,tcp://${activemqhost}:61613", ExpectedException = typeof(UriFormatException))]
+        public void TestURI(string connectionURI)
+        {
+            NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(connectionURI));
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            using(IConnection connection = factory.CreateConnection("", ""))
+            {
+                Assert.IsNotNull(connection);
+            }
+        }
+
+        [Test]
+        public void TestURIForPrefetchHandling()
+        {
+            string uri1 = "stomp:tcp://${activemqhost}:61613" +
+                          "?nms.PrefetchPolicy.queuePrefetch=1" +
+                          "&nms.PrefetchPolicy.queueBrowserPrefetch=2" +
+                          "&nms.PrefetchPolicy.topicPrefetch=3" +
+                          "&nms.PrefetchPolicy.durableTopicPrefetch=4" +
+                          "&nms.PrefetchPolicy.maximumPendingMessageLimit=5";
+
+            string uri2 = "stomp:tcp://${activemqhost}:61613" +
+                          "?nms.PrefetchPolicy.queuePrefetch=112" +
+                          "&nms.PrefetchPolicy.queueBrowserPrefetch=212" +
+                          "&nms.PrefetchPolicy.topicPrefetch=312" +
+                          "&nms.PrefetchPolicy.durableTopicPrefetch=412" +
+                          "&nms.PrefetchPolicy.maximumPendingMessageLimit=512";
+
+            NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1));
+
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            using(IConnection connection = factory.CreateConnection("", ""))
+            {
+                Assert.IsNotNull(connection);
+
+                Connection amqConnection = connection as Connection;
+                Assert.AreEqual(1, amqConnection.PrefetchPolicy.QueuePrefetch);
+                Assert.AreEqual(2, amqConnection.PrefetchPolicy.QueueBrowserPrefetch);
+                Assert.AreEqual(3, amqConnection.PrefetchPolicy.TopicPrefetch);
+                Assert.AreEqual(4, amqConnection.PrefetchPolicy.DurableTopicPrefetch);
+                Assert.AreEqual(5, amqConnection.PrefetchPolicy.MaximumPendingMessageLimit);
+            }
+
+            factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2));
+
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            using(IConnection connection = factory.CreateConnection("", ""))
+            {
+                Assert.IsNotNull(connection);
+
+                Connection amqConnection = connection as Connection;
+                Assert.AreEqual(112, amqConnection.PrefetchPolicy.QueuePrefetch);
+                Assert.AreEqual(212, amqConnection.PrefetchPolicy.QueueBrowserPrefetch);
+                Assert.AreEqual(312, amqConnection.PrefetchPolicy.TopicPrefetch);
+                Assert.AreEqual(412, amqConnection.PrefetchPolicy.DurableTopicPrefetch);
+                Assert.AreEqual(512, amqConnection.PrefetchPolicy.MaximumPendingMessageLimit);
+            }
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs?rev=897036&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs Thu Jan  7 22:31:30 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+using Apache.NMS.Stomp.Threads;
+
+namespace Apache.NMS.Stomp.Test.Threads
+{
+    [TestFixture]
+    public class DedicatedTaskRunnerTest
+    {
+        class SimpleCountingTask : Task
+        {
+            private uint count;
+
+            public SimpleCountingTask()
+            {
+                this.count = 0;
+            }
+
+            public bool Iterate()
+            {
+                count++;
+                return false;
+            }
+
+            public uint Count
+            {
+                get{ return count; }
+            }
+        }
+
+        class InfiniteCountingTask : Task
+        {
+            private uint count;
+
+            public InfiniteCountingTask()
+            {
+                this.count = 0;
+            }
+
+            public bool Iterate()
+            {
+                count++;
+                return true;
+            }
+
+            public uint Count
+            {
+                get{ return count; }
+            }
+        }
+
+        [Test]
+        public void TestSimple()
+        {
+            try
+            {
+                new DedicatedTaskRunner(null);
+                Assert.Fail("Should throw a NullReferenceException");
+            }
+            catch
+            {
+            }
+
+            SimpleCountingTask simpleTask = new SimpleCountingTask();
+            Assert.IsTrue( simpleTask.Count == 0 );
+            DedicatedTaskRunner simpleTaskRunner = new DedicatedTaskRunner(simpleTask);
+
+            simpleTaskRunner.Wakeup();
+            Thread.Sleep( 500 );
+            Assert.IsTrue( simpleTask.Count >= 1 );
+            simpleTaskRunner.Wakeup();
+            Thread.Sleep( 500 );
+            Assert.IsTrue( simpleTask.Count >= 2 );
+
+            InfiniteCountingTask infiniteTask = new InfiniteCountingTask();
+            Assert.IsTrue( infiniteTask.Count == 0 );
+            DedicatedTaskRunner infiniteTaskRunner = new DedicatedTaskRunner(infiniteTask);
+            Thread.Sleep( 500 );
+            Assert.IsTrue( infiniteTask.Count != 0 );
+            infiniteTaskRunner.Shutdown();
+            uint count = infiniteTask.Count;
+            Thread.Sleep( 500 );
+            Assert.IsTrue( infiniteTask.Count == count );
+
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=897036&r1=897035&r2=897036&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Thu Jan  7 22:31:30 2010
@@ -98,5 +98,7 @@
     <Compile Include="src\test\csharp\Commands\CommandTest.cs" />
     <Compile Include="src\test\csharp\Commands\MessageTest.cs" />
     <Compile Include="src\test\csharp\Commands\TextMessageTest.cs" />
+    <Compile Include="src\test\csharp\NMSConnectionFactoryTest.cs" />
+    <Compile Include="src\test\csharp\Threads\DedicatedTaskRunnerTest.cs" />
   </ItemGroup>
 </Project>
\ No newline at end of file