You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/20 16:09:21 UTC

svn commit: r1363790 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQConnectionFactory.java thread/TaskRunnerFactory.java

Author: dejanb
Date: Fri Jul 20 14:09:21 2012
New Revision: 1363790

URL: http://svn.apache.org/viewvc?rev=1363790&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3885 - limiting number of threads used by session executor and provide a way to set custom task runner factory

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1363790&r1=1363789&r2=1363790&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Jul 20 14:09:21 2012
@@ -110,6 +110,7 @@ public class ActiveMQConnection implemen
     public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
     public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
     public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
+    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
 
@@ -200,6 +201,8 @@ public class ActiveMQConnection implemen
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
 
+    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
+
     /**
      * Construct an <code>ActiveMQConnection</code>
      *
@@ -978,7 +981,7 @@ public class ActiveMQConnection implemen
     public TaskRunnerFactory getSessionTaskRunner() {
         synchronized (this) {
             if (sessionTaskRunner == null) {
-                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
+                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
             }
         }
         return sessionTaskRunner;
@@ -2568,4 +2571,12 @@ public class ActiveMQConnection implemen
     public RedeliveryPolicyMap getRedeliveryPolicyMap() {
         return redeliveryPolicyMap;
     }
+
+    public int getMaxThreadPoolSize() {
+        return maxThreadPoolSize;
+    }
+
+    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
+        this.maxThreadPoolSize = maxThreadPoolSize;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1363790&r1=1363789&r2=1363790&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Jul 20 14:09:21 2012
@@ -41,6 +41,7 @@ import org.apache.activemq.jndi.JNDIBase
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
@@ -67,6 +68,7 @@ public class ActiveMQConnectionFactory e
     public static final String DEFAULT_PASSWORD = null;
     public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
 
+
     protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
         public Thread newThread(Runnable run) {
             Thread thread = new Thread(run);
@@ -127,6 +129,8 @@ public class ActiveMQConnectionFactory e
     private boolean messagePrioritySupported = true;
     private boolean transactedIndividualAck = false;
     private boolean nonBlockingRedelivery = false;
+    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
+    private TaskRunnerFactory sessionTaskRunner;
 
     // /////////////////////////////////////////////
     //
@@ -338,6 +342,8 @@ public class ActiveMQConnectionFactory e
         connection.setMessagePrioritySupported(isMessagePrioritySupported());
         connection.setTransactedIndividualAck(isTransactedIndividualAck());
         connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
+        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
+        connection.setSessionTaskRunner(getSessionTaskRunner());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -1095,4 +1101,19 @@ public class ActiveMQConnectionFactory e
          this.nonBlockingRedelivery = nonBlockingRedelivery;
      }
 
+    public int getMaxThreadPoolSize() {
+        return maxThreadPoolSize;
+    }
+
+    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
+        this.maxThreadPoolSize = maxThreadPoolSize;
+    }
+
+    public TaskRunnerFactory getSessionTaskRunner() {
+        return sessionTaskRunner;
+    }
+
+    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
+        this.sessionTaskRunner = sessionTaskRunner;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1363790&r1=1363789&r2=1363790&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Fri Jul 20 14:09:21 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.thread;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,6 +45,7 @@ public class TaskRunnerFactory implement
     private AtomicLong id = new AtomicLong(0);
     private boolean dedicatedTaskRunner;
     private AtomicBoolean initDone = new AtomicBoolean(false);
+    private int maxThreadPoolSize = Integer.MAX_VALUE;
 
     public TaskRunnerFactory() {
         this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
@@ -54,11 +56,16 @@ public class TaskRunnerFactory implement
     }
 
     public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
+        this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE);
+    }
+
+    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
         this.name = name;
         this.priority = priority;
         this.daemon = daemon;
         this.maxIterationsPerRun = maxIterationsPerRun;
         this.dedicatedTaskRunner = dedicatedTaskRunner;
+        this.maxThreadPoolSize = maxThreadPoolSize;
     }
 
     public void init() {
@@ -103,7 +110,7 @@ public class TaskRunnerFactory implement
     }
 
     protected ExecutorService createDefaultExecutor() {
-        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
                 Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
                 thread.setDaemon(daemon);
@@ -161,4 +168,12 @@ public class TaskRunnerFactory implement
     public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
         this.dedicatedTaskRunner = dedicatedTaskRunner;
     }
+
+    public int getMaxThreadPoolSize() {
+        return maxThreadPoolSize;
+    }
+
+    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
+        this.maxThreadPoolSize = maxThreadPoolSize;
+    }
 }