You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/12/15 13:44:48 UTC

svn commit: r1049527 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/discovery/simple/ main/java/org/apache/activemq/transport/vm/ test/java/org/apache/activemq/transport/vm/

Author: gtully
Date: Wed Dec 15 12:44:47 2010
New Revision: 1049527

URL: http://svn.apache.org/viewvc?rev=1049527&view=rev
Log:
additional broker side improvements for https://issues.apache.org/jira/browse/AMQ-2852 - have discovery and network connector and vm async tasks delegate to the the default thread pool executor, serialized the test to ensure shutdown is called once after verification

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Dec 15 12:44:47 2010
@@ -25,9 +25,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,6 +65,8 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
@@ -92,7 +91,7 @@ import org.apache.commons.logging.LogFac
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
     private static final Log LOG = LogFactory.getLog(DemandForwardingBridgeSupport.class);
-    private static final ThreadPoolExecutor ASYNC_TASKS;
+    private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
     protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
     protected final Transport remoteBroker;
@@ -251,7 +250,7 @@ public abstract class DemandForwardingBr
     }
 
     protected void triggerLocalStartBridge() throws IOException {
-        ASYNC_TASKS.execute(new Runnable() {
+        asyncTaskRunner.execute(new Runnable() {
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@@ -267,7 +266,7 @@ public abstract class DemandForwardingBr
     }
 
     protected void triggerRemoteStartBridge() throws IOException {
-        ASYNC_TASKS.execute(new Runnable() {
+        asyncTaskRunner.execute(new Runnable() {
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
@@ -391,7 +390,7 @@ public abstract class DemandForwardingBr
                 try {
                     remoteBridgeStarted.set(false);
                     final CountDownLatch sendShutdown = new CountDownLatch(1);
-                    ASYNC_TASKS.execute(new Runnable() {
+                    asyncTaskRunner.execute(new Runnable() {
                         public void run() {
                             try {
                                 localBroker.oneway(new ShutdownInfo());
@@ -433,7 +432,7 @@ public abstract class DemandForwardingBr
                 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
             }
             LOG.debug("The remote Exception was: " + error, error);
-            ASYNC_TASKS.execute(new Runnable() {
+            asyncTaskRunner.execute(new Runnable() {
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -647,7 +646,7 @@ public abstract class DemandForwardingBr
         if (!disposed.get()) {
             LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
             LOG.debug("The local Exception was:" + error, error);
-            ASYNC_TASKS.execute(new Runnable() {
+            asyncTaskRunner.execute(new Runnable() {
                 public void run() {
                     ServiceSupport.dispose(getControllingService());
                 }
@@ -674,7 +673,7 @@ public abstract class DemandForwardingBr
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
 
             // continue removal in separate thread to free up this thread for outstanding responses
-            ASYNC_TASKS.execute(new Runnable() {
+            asyncTaskRunner.execute(new Runnable() {
                 public void run() {
                     sub.waitForCompletion();
                     try {
@@ -1277,15 +1276,4 @@ public abstract class DemandForwardingBr
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
-
-    static {
-        ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "NetworkBridge");
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Wed Dec 15 12:44:47 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.apache.commons.logging.Log;
@@ -38,8 +39,7 @@ import org.apache.commons.logging.LogFac
  */
 public class SimpleDiscoveryAgent implements DiscoveryAgent {
 
-    private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class); 
-    private static final ThreadPoolExecutor ASYNC_TASKS;
+    private final static Log LOG = LogFactory.getLog(SimpleDiscoveryAgent.class);
     private long initialReconnectDelay = 1000;
     private long maxReconnectDelay = 1000 * 30;
     private long backOffMultiplier = 2;
@@ -110,14 +110,14 @@ public class SimpleDiscoveryAgent implem
         if (event.failed.compareAndSet(false, true)) {
 
             listener.onServiceRemove(event);
-            ASYNC_TASKS.execute(new Runnable() {
+            DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
                 public void run() {
 
                     // We detect a failed connection attempt because the service
                     // fails right
                     // away.
                     if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
-                        LOG.debug("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: "+event);
+                        LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
 
                         event.connectFailures++;
 
@@ -132,7 +132,7 @@ public class SimpleDiscoveryAgent implem
                                     return;
                                 }
 
-                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
+                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
                                 sleepMutex.wait(event.reconnectDelay);
                             } catch (InterruptedException ie) {
                                 Thread.currentThread().interrupt();
@@ -163,7 +163,7 @@ public class SimpleDiscoveryAgent implem
                     event.failed.set(false);
                     listener.onServiceAdd(event);
                 }
-            });
+            }, "Simple Discovery Agent");
         }
     }
 
@@ -214,16 +214,4 @@ public class SimpleDiscoveryAgent implem
     public void setUseExponentialBackOff(boolean useExponentialBackOff) {
         this.useExponentialBackOff = useExponentialBackOff;
     }
-    
-    static {
-        ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "Simple Discovery Agent: "+runnable);
-                thread.setDaemon(true);
-                return thread;
-            }
-        });
-    }
-
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Dec 15 12:44:47 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -44,8 +45,6 @@ public class VMTransport implements Tran
 
     private static final Object DISCONNECT = new Object();
     private static final AtomicLong NEXT_ID = new AtomicLong(0);
-    // still possible to configure dedicated task runner through system property but not programmatically
-    private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000, false);
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean disposed;
@@ -331,7 +330,7 @@ public class VMTransport implements Tran
         if (async) {
             synchronized (lazyInitMutext) {
                 if (taskRunner == null) {
-                    taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString());
+                    taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
                 }
             }
             try {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java?rev=1049527&r1=1049526&r2=1049527&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java Wed Dec 15 12:44:47 2010
@@ -27,8 +27,8 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.bugs.embedded.ThreadExplorer;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.thread.DefaultThreadPools;
 
-import static org.apache.activemq.thread.DefaultThreadPools.shutdown;
 
 public class VmTransportNetworkBrokerTest extends TestCase {
 
@@ -38,8 +38,10 @@ public class VmTransportNetworkBrokerTes
     CountDownLatch started = new CountDownLatch(1);
     CountDownLatch gotConnection = new CountDownLatch(1);
 
-    public void testNoThreadLeakWithActiveVMConnection() throws Exception {
-        
+    public void testNoThreadLeak() throws Exception {
+
+        // with VMConnection and simple discovery network connector
+        int originalThreadCount = Thread.activeCount();
         BrokerService broker = new BrokerService();
         broker.setDedicatedTaskRunner(true);
         broker.setPersistent(false);
@@ -55,43 +57,42 @@ public class VmTransportNetworkBrokerTes
         // let it settle
         TimeUnit.SECONDS.sleep(5);
         
-        int threadCount = Thread.activeCount();
+        int threadCountAfterStart = Thread.activeCount();
         TimeUnit.SECONDS.sleep(30);
         int threadCountAfterSleep = Thread.activeCount();
         
-        assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" + threadCount + " threadCountAfterSleep=" + threadCountAfterSleep,
-                threadCountAfterSleep < threadCount + 8);
+        assertTrue("Threads are leaking: " + ThreadExplorer.show("active sleep") + ", threadCount=" +threadCountAfterStart + " threadCountAfterSleep=" + threadCountAfterSleep,
+                threadCountAfterSleep < threadCountAfterStart + 8);
 
         connection.close();
         broker.stop();
         broker.waitUntilStopped();
-    }
 
-    public void testNoDanglingThreadsAfterStop() throws Exception {
+        // testNoDanglingThreadsAfterStop with tcp transport
 
-        int threadCount = Thread.activeCount();
-        BrokerService broker = new BrokerService();
+        broker = new BrokerService();
         broker.setSchedulerSupport(true);
         broker.setDedicatedTaskRunner(true);
         broker.setPersistent(false);
         broker.addConnector("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
         broker.start();
 
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
-        Connection connection = cf.createConnection("system", "manager");
+        cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
+        connection = cf.createConnection("system", "manager");
         connection.start();
         connection.close();
         broker.stop();
         broker.waitUntilStopped();
-        shutdown();
+
+        // must only be called when all brokers and connections are done!
+        DefaultThreadPools.shutdown();
 
         // let it settle
         TimeUnit.SECONDS.sleep(5);        
         
         int threadCountAfterStop = Thread.activeCount();
-        assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". threadCount=" + threadCount + " threadCountAfterStop=" + threadCountAfterStop,
-                threadCountAfterStop == threadCount);
+        assertTrue("Threads are leaking: " + ThreadExplorer.show("active after stop") + ". originalThreadCount=" + originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop,
+                threadCountAfterStop == originalThreadCount);
 
     }
-
 }