You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/04/08 12:22:40 UTC

svn commit: r1090186 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Author: dejanb
Date: Fri Apr  8 10:22:40 2011
New Revision: 1090186

URL: http://svn.apache.org/viewvc?rev=1090186&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3272 - prevent RejectedExecutionException in kahadb

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1090186&r1=1090185&r2=1090186&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Apr  8 10:22:40 2011
@@ -28,14 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.broker.ConnectionContext;
@@ -182,7 +175,7 @@ public class KahaDBStore extends Message
         this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
         this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
-        this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+        this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
                 asyncQueueJobQueue, new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
@@ -190,7 +183,7 @@ public class KahaDBStore extends Message
                         return thread;
                     }
                 });
-        this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+        this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
                 asyncTopicJobQueue, new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
@@ -1040,8 +1033,12 @@ public class KahaDBStore extends Message
         }
     }
 
-    interface StoreTask {
+    public interface StoreTask {
         public boolean cancel();
+
+        public void aquireLocks();
+
+        public void releaseLocks();
     }
 
     class StoreQueueTask implements Runnable, StoreTask {
@@ -1071,7 +1068,7 @@ public class KahaDBStore extends Message
             return false;
         }
 
-        void aquireLocks() {
+        public void aquireLocks() {
             if (this.locked.compareAndSet(false, true)) {
                 try {
                     globalQueueSemaphore.acquire();
@@ -1084,7 +1081,7 @@ public class KahaDBStore extends Message
 
         }
 
-        void releaseLocks() {
+        public void releaseLocks() {
             if (this.locked.compareAndSet(true, false)) {
                 store.releaseLocalAsyncLock();
                 globalQueueSemaphore.release();
@@ -1106,8 +1103,6 @@ public class KahaDBStore extends Message
                 }
             } catch (Exception e) {
                 this.future.setException(e);
-            } finally {
-                releaseLocks();
             }
         }
 
@@ -1145,7 +1140,7 @@ public class KahaDBStore extends Message
         }
 
         @Override
-        void aquireLocks() {
+        public void aquireLocks() {
             if (this.locked.compareAndSet(false, true)) {
                 try {
                     globalTopicSemaphore.acquire();
@@ -1159,7 +1154,7 @@ public class KahaDBStore extends Message
         }
 
         @Override
-        void releaseLocks() {
+        public void releaseLocks() {
             if (this.locked.compareAndSet(true, false)) {
                 message.decrementReferenceCount();
                 store.releaseLocalAsyncLock();
@@ -1202,9 +1197,23 @@ public class KahaDBStore extends Message
                 }
             } catch (Exception e) {
                 this.future.setException(e);
-            } finally {
-                releaseLocks();
             }
         }
     }
+
+    public class StoreTaskExecutor extends ThreadPoolExecutor {
+
+        public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
+        }
+
+        protected void afterExecute(Runnable runnable, Throwable throwable) {
+            super.afterExecute(runnable, throwable);
+
+            if (runnable instanceof StoreTask) {
+               ((StoreTask)runnable).releaseLocks();
+            }
+
+        }
+    }
 }