You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by da...@apache.org on 2012/09/05 18:28:58 UTC

svn commit: r1381237 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: network/DemandForwardingBridgeSupport.java thread/DefaultThreadPools.java thread/TaskRunnerFactory.java transport/discovery/simple/SimpleDiscoveryAgent.java

Author: davsclaus
Date: Wed Sep  5 16:28:58 2012
New Revision: 1381237

URL: http://svn.apache.org/viewvc?rev=1381237&view=rev
Log:
AMQ-3451: Statis default thread pools is @deprecated. Refactored some code to avoid using that. Otherwise it causes leaks in Tomcat when redeploying apps.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1381237&r1=1381236&r2=1381237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Sep  5 16:28:58 2012
@@ -70,7 +70,6 @@ import org.apache.activemq.command.Shutd
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
@@ -93,7 +92,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
     private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
-    private final TaskRunnerFactory asyncTaskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory();
+    private TaskRunnerFactory asyncTaskRunner;
     protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
     protected final Transport localBroker;
     protected final Transport remoteBroker;
@@ -157,6 +156,9 @@ public abstract class DemandForwardingBr
 
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
+            asyncTaskRunner = new TaskRunnerFactory("ActiveMQ ForwardingBridge Task");
+            asyncTaskRunner.init();
+
             localBroker.setTransportListener(new DefaultTransportListener() {
 
                 @Override
@@ -374,6 +376,10 @@ public abstract class DemandForwardingBr
                     startedLatch.countDown();
                     startedLatch.countDown();
                     localStartedLatch.countDown();
+
+                    // stop task runner
+                    asyncTaskRunner.shutdown();
+                    asyncTaskRunner = null;
                     ss.throwFirstException();
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java?rev=1381237&r1=1381236&r2=1381237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DefaultThreadPools.java Wed Sep  5 16:28:58 2012
@@ -16,35 +16,18 @@
  */
 package org.apache.activemq.thread;
 
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-
 /**
- * 
- * 
+ * @deprecated do not use this class. Instead use {@link TaskRunnerFactory}
  */
+@Deprecated
 public final class DefaultThreadPools {
 
-//    private static final Executor DEFAULT_POOL;
-//    static {
-//        DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
-//            public Thread newThread(Runnable runnable) {
-//                Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
-//                thread.setDaemon(true);
-//                return thread;
-//            }
-//        });
-//    }    
     private static final TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
     
     private DefaultThreadPools() {        
     }
     
-//    public static Executor getDefaultPool() {
-//        return DEFAULT_POOL;
-//    }
-    
+    @Deprecated
     public static TaskRunnerFactory getDefaultTaskRunnerFactory() {
         return DEFAULT_TASK_RUNNER_FACTORY;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1381237&r1=1381236&r2=1381237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Wed Sep  5 16:28:58 2012
@@ -49,7 +49,11 @@ public class TaskRunnerFactory implement
     private RejectedExecutionHandler rejectedTaskHandler = null;
 
     public TaskRunnerFactory() {
-        this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
+        this("ActiveMQ Task");
+    }
+
+    public TaskRunnerFactory(String name) {
+        this(name, Thread.NORM_PRIORITY, true, 1000);
     }
 
     private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
@@ -98,7 +102,7 @@ public class TaskRunnerFactory implement
     }
 
     public void execute(Runnable runnable) {
-        execute(runnable, "ActiveMQ Task");
+        execute(runnable, name);
     }
 
     public void execute(Runnable runnable, String name) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=1381237&r1=1381236&r2=1381237&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Wed Sep  5 16:28:58 2012
@@ -21,7 +21,7 @@ import java.net.URI;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.command.DiscoveryEvent;
-import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.slf4j.Logger;
@@ -46,6 +46,7 @@ public class SimpleDiscoveryAgent implem
     private DiscoveryListener listener;
     private String services[] = new String[] {};
     private final AtomicBoolean running = new AtomicBoolean(false);
+    private TaskRunnerFactory taskRunner;
 
     class SimpleDiscoveryEvent extends DiscoveryEvent {
 
@@ -72,6 +73,9 @@ public class SimpleDiscoveryAgent implem
     }
 
     public void start() throws Exception {
+        taskRunner = new TaskRunnerFactory();
+        taskRunner.init();
+
         running.set(true);
         for (int i = 0; i < services.length; i++) {
             listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
@@ -80,6 +84,11 @@ public class SimpleDiscoveryAgent implem
 
     public void stop() throws Exception {
         running.set(false);
+
+        taskRunner.shutdown();
+
+        // TODO: Should we not remove the services on the listener?
+
         synchronized (sleepMutex) {
             sleepMutex.notifyAll();
         }
@@ -110,7 +119,7 @@ public class SimpleDiscoveryAgent implem
         if (event.failed.compareAndSet(false, true)) {
 
             listener.onServiceRemove(event);
-            DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
+            taskRunner.execute(new Runnable() {
                 public void run() {
 
                     // We detect a failed connection attempt because the service