You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/07/26 12:45:42 UTC

svn commit: r1365943 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQConnectionFactory.java thread/TaskRunnerFactory.java

Author: dejanb
Date: Thu Jul 26 10:45:42 2012
New Revision: 1365943

URL: http://svn.apache.org/viewvc?rev=1365943&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3885 - allow setting resjected task handler on thread pool executor

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1365943&r1=1365942&r2=1365943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Jul 26 10:45:42 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -203,6 +204,7 @@ public class ActiveMQConnection implemen
     private boolean nonBlockingRedelivery = false;
 
     private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
+    private RejectedExecutionHandler rejectedTaskHandler = null;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -985,9 +987,11 @@ public class ActiveMQConnection implemen
     }
 
     public TaskRunnerFactory getSessionTaskRunner() {
+        System.out.println(maxThreadPoolSize);
         synchronized (this) {
             if (sessionTaskRunner == null) {
                 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
+                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
             }
         }
         return sessionTaskRunner;
@@ -2595,4 +2599,12 @@ public class ActiveMQConnection implemen
         this.queueOnlyConnection = true;
         return this;
     }
+
+    public RejectedExecutionHandler getRejectedTaskHandler() {
+        return rejectedTaskHandler;
+    }
+
+    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
+        this.rejectedTaskHandler = rejectedTaskHandler;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1365943&r1=1365942&r2=1365943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Thu Jul 26 10:45:42 2012
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 
@@ -131,6 +132,7 @@ public class ActiveMQConnectionFactory e
     private boolean nonBlockingRedelivery = false;
     private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
     private TaskRunnerFactory sessionTaskRunner;
+    private RejectedExecutionHandler rejectedTaskHandler = null;
 
     // /////////////////////////////////////////////
     //
@@ -344,6 +346,7 @@ public class ActiveMQConnectionFactory e
         connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
         connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
         connection.setSessionTaskRunner(getSessionTaskRunner());
+        connection.setRejectedTaskHandler(getRejectedTaskHandler());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -1116,4 +1119,12 @@ public class ActiveMQConnectionFactory e
     public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
         this.sessionTaskRunner = sessionTaskRunner;
     }
+
+    public RejectedExecutionHandler getRejectedTaskHandler() {
+        return rejectedTaskHandler;
+    }
+
+    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
+        this.rejectedTaskHandler = rejectedTaskHandler;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1365943&r1=1365942&r2=1365943&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Thu Jul 26 10:45:42 2012
@@ -18,7 +18,7 @@ package org.apache.activemq.thread;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -46,6 +46,7 @@ public class TaskRunnerFactory implement
     private boolean dedicatedTaskRunner;
     private AtomicBoolean initDone = new AtomicBoolean(false);
     private int maxThreadPoolSize = Integer.MAX_VALUE;
+    private RejectedExecutionHandler rejectedTaskHandler = null;
 
     public TaskRunnerFactory() {
         this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
@@ -118,6 +119,9 @@ public class TaskRunnerFactory implement
                 return thread;
             }
         });
+        if (rejectedTaskHandler != null) {
+            rc.setRejectedExecutionHandler(rejectedTaskHandler);
+        }
         return rc;
     }
 
@@ -176,4 +180,12 @@ public class TaskRunnerFactory implement
     public void setMaxThreadPoolSize(int maxThreadPoolSize) {
         this.maxThreadPoolSize = maxThreadPoolSize;
     }
+
+    public RejectedExecutionHandler getRejectedTaskHandler() {
+        return rejectedTaskHandler;
+    }
+
+    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
+        this.rejectedTaskHandler = rejectedTaskHandler;
+    }
 }