You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/12/28 19:52:49 UTC

svn commit: r1225279 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Threads/ThreadPoolExecutor.cs test/csharp/AMQNET366Test.cs

Author: tabish
Date: Wed Dec 28 18:52:49 2011
New Revision: 1225279

URL: http://svn.apache.org/viewvc?rev=1225279&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-366

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1225279&r1=1225278&r2=1225279&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs Wed Dec 28 18:52:49 2011
@@ -35,6 +35,7 @@ namespace Apache.NMS.ActiveMQ.Threads
         private bool closing = false;
         private bool closed = false;
         private ManualResetEvent executionComplete = new ManualResetEvent(true);
+        private Thread workThread = null;
 
         /// <summary>
         /// Represents an asynchronous task that is executed on the ThreadPool
@@ -115,7 +116,7 @@ namespace Apache.NMS.ActiveMQ.Threads
                     this.closing = true;
                     this.workQueue.Clear();
 
-                    if(this.running)
+                    if(this.running && Thread.CurrentThread != this.workThread)
                     {
                         syncRoot.ReleaseMutex();
                         this.executionComplete.WaitOne();
@@ -135,6 +136,8 @@ namespace Apache.NMS.ActiveMQ.Threads
 
             lock(syncRoot)
             {
+                this.workThread = Thread.CurrentThread;
+
                 if(this.workQueue.Count == 0 || this.closing)
                 {
                     this.running = false;
@@ -151,6 +154,8 @@ namespace Apache.NMS.ActiveMQ.Threads
             }
             finally
             {
+                this.workThread = null;
+
                 if(this.closing)
                 {
                     this.running = false;

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs?rev=1225279&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs Wed Dec 28 18:52:49 2011
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class AMQNET366Test : NMSTestSupport
+    {
+        private IConnection connection;
+        private bool connectionClosed = false;
+        private readonly String connectionUri = "activemq:tcpfaulty://${activemqhost}:61616";
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+        }
+
+        [Test, Timeout(60000)]
+        public void TestConnection()
+        {
+            IConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(connectionUri));
+
+            using (connection = factory.CreateConnection())
+            using (ISession session = connection.CreateSession())
+            {
+                IDestination destination = SessionUtil.GetDestination(session, "queue://test.test.in");
+                using (IMessageConsumer consumer = session.CreateConsumer(destination))
+                {
+                    Connection amqConnection = connection as Connection;
+                    connection.ExceptionListener += ConnectionException;
+
+                    consumer.Listener += OnMessage;
+
+                    TcpFaultyTransport transport = amqConnection.ITransport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                    Assert.IsNotNull(transport);
+                    transport.OnewayCommandPreProcessor += FailOnKeepAlive;
+
+                    Thread.Sleep(TimeSpan.FromSeconds(2));
+
+                    connection.Start();
+
+                    int count = 30;
+                    while (count-- > 0)
+                    {
+                        if (!connectionClosed)
+                        {
+                            Thread.Sleep(TimeSpan.FromSeconds(3));
+                        }
+                    }
+
+                    Assert.IsTrue(connectionClosed);
+                }
+            }
+        }
+
+        public void FailOnKeepAlive(ITransport transport, Command command)
+        {
+            if (command.IsKeepAliveInfo)
+            {
+                throw new IOException("Simulated Transport Failure");
+            }
+        }
+
+        protected void OnMessage(IMessage receivedMsg)
+        {
+            var textMessage = receivedMsg as ITextMessage;
+
+            if (textMessage == null)
+            {
+                Tracer.Info("null");
+            }
+            else
+            {
+                Tracer.Info(textMessage.Text);
+            }
+        }
+
+        private void ConnectionException(Exception e)
+        {
+            Tracer.Debug("Connection signalled an Exception");
+            connection.Close();
+            this.connectionClosed = true;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQNET366Test.cs
------------------------------------------------------------------------------
    svn:eol-style = native