You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/04/08 12:22:40 UTC
svn commit: r1090186 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Author: dejanb
Date: Fri Apr 8 10:22:40 2011
New Revision: 1090186
URL: http://svn.apache.org/viewvc?rev=1090186&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3272 - prevent RejectedExecutionException in kahadb
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1090186&r1=1090185&r2=1090186&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Apr 8 10:22:40 2011
@@ -28,14 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
@@ -182,7 +175,7 @@ public class KahaDBStore extends Message
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
- this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+ this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncQueueJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
@@ -190,7 +183,7 @@ public class KahaDBStore extends Message
return thread;
}
});
- this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+ this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
asyncTopicJobQueue, new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
@@ -1040,8 +1033,12 @@ public class KahaDBStore extends Message
}
}
- interface StoreTask {
+ public interface StoreTask {
public boolean cancel();
+
+ public void aquireLocks();
+
+ public void releaseLocks();
}
class StoreQueueTask implements Runnable, StoreTask {
@@ -1071,7 +1068,7 @@ public class KahaDBStore extends Message
return false;
}
- void aquireLocks() {
+ public void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalQueueSemaphore.acquire();
@@ -1084,7 +1081,7 @@ public class KahaDBStore extends Message
}
- void releaseLocks() {
+ public void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
store.releaseLocalAsyncLock();
globalQueueSemaphore.release();
@@ -1106,8 +1103,6 @@ public class KahaDBStore extends Message
}
} catch (Exception e) {
this.future.setException(e);
- } finally {
- releaseLocks();
}
}
@@ -1145,7 +1140,7 @@ public class KahaDBStore extends Message
}
@Override
- void aquireLocks() {
+ public void aquireLocks() {
if (this.locked.compareAndSet(false, true)) {
try {
globalTopicSemaphore.acquire();
@@ -1159,7 +1154,7 @@ public class KahaDBStore extends Message
}
@Override
- void releaseLocks() {
+ public void releaseLocks() {
if (this.locked.compareAndSet(true, false)) {
message.decrementReferenceCount();
store.releaseLocalAsyncLock();
@@ -1202,9 +1197,23 @@ public class KahaDBStore extends Message
}
} catch (Exception e) {
this.future.setException(e);
- } finally {
- releaseLocks();
}
}
}
+
+ public class StoreTaskExecutor extends ThreadPoolExecutor {
+
+ public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory);
+ }
+
+ protected void afterExecute(Runnable runnable, Throwable throwable) {
+ super.afterExecute(runnable, throwable);
+
+ if (runnable instanceof StoreTask) {
+ ((StoreTask)runnable).releaseLocks();
+ }
+
+ }
+ }
}