You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jd...@apache.org on 2007/09/22 16:24:24 UTC
svn commit: r578446 - in
/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request:
Request.java RequestManager.java
Author: jdillon
Date: Sat Sep 22 07:24:23 2007
New Revision: 578446
URL: http://svn.apache.org/viewvc?rev=578446&view=rev
Log:
Changed the request mutex to a lock
Update the request manager to lock on the request its process... still broken though... :-(
Modified:
geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java
geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java
Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java?rev=578446&r1=578445&r2=578446&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java Sat Sep 22 07:24:23 2007
@@ -26,6 +26,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
@@ -42,7 +44,7 @@
{
private transient final Logger log = LoggerFactory.getLogger(getClass());
- private transient final Object mutex = new Object();
+ final Lock lock = new ReentrantLock();
private final BlockingQueue<Object> responses = new LinkedBlockingQueue<Object>();
@@ -167,10 +169,6 @@
}
}
- Object getMutex() {
- return mutex;
- }
-
private void queueResponse(final Object answer) {
signaled = true;
@@ -180,7 +178,9 @@
void signal(final Response response) {
assert response != null;
- synchronized (mutex) {
+ lock.lock();
+
+ try {
if (log.isTraceEnabled()) {
log.debug("Signal response: {}", response);
}
@@ -194,21 +194,34 @@
endOfResponses = true;
}
}
+ finally {
+ lock.unlock();
+ }
}
void timeout() {
- synchronized (mutex) {
+ lock.lock();
+
+ try {
log.debug("Timeout");
queueResponse(RequestTimeoutException.class);
endOfResponses = true;
}
+ finally {
+ lock.unlock();
+ }
}
boolean isSignaled() {
- synchronized (mutex) {
+ lock.lock();
+
+ try {
return signaled;
+ }
+ finally {
+ lock.unlock();
}
}
}
Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java?rev=578446&r1=578445&r2=578446&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java Sat Sep 22 07:24:23 2007
@@ -25,6 +25,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.geronimo.gshell.remote.message.Message;
import org.apache.geronimo.gshell.remote.session.SessionAttributeBinder;
@@ -40,7 +41,9 @@
{
public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
- private transient final Logger log = LoggerFactory.getLogger(getClass());
+ private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0);
+
+ private transient final Logger log = LoggerFactory.getLogger(getClass() + "-" + INSTANCE_COUNTER.getAndIncrement());
private transient final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1);
@@ -48,10 +51,6 @@
private final Map<Request,TimeoutTask> timeouts = Collections.synchronizedMap(new HashMap<Request,TimeoutTask>());
- //
- // TODO: Lock on Request.getMutex(), and/or change the mutex to a read/write lock?
- //
-
public boolean contains(final Message.ID id) {
assert id != null;
@@ -61,26 +60,40 @@
public boolean contains(final Request request) {
assert request != null;
- return contains(request.getId());
+ request.lock.lock();
+
+ try {
+ return contains(request.getId());
+ }
+ finally {
+ request.lock.unlock();
+ }
}
public void add(final Request request) {
assert request != null;
- Message.ID id = request.getId();
+ request.lock.lock();
- if (contains(request)) {
- throw new DuplicateRequestException(id);
- }
+ try {
+ Message.ID id = request.getId();
- if (log.isTraceEnabled()) {
- log.trace("Adding: {}", request);
+ if (contains(request)) {
+ throw new DuplicateRequestException(id);
+ }
+
+ requests.put(id, request);
+
+ if (log.isTraceEnabled()) {
+ log.trace("Added: {}", request);
+ }
+ else {
+ log.debug("Added: {}", id);
+ }
}
- else {
- log.debug("Adding: {}", id);
+ finally {
+ request.lock.unlock();
}
-
- requests.put(id, request);
}
public Request get(final Message.ID id) {
@@ -92,29 +105,31 @@
public Request remove(final Message.ID id) {
assert id != null;
- log.debug("Removing: {}", id);
+ Request request = get(id);
- return requests.remove(id);
- }
-
- public void clear() {
- int l;
-
- l = requests.size();
-
- if (l > 0) {
- log.warn("Purging " + l + " request(s)");
+ if (request == null) {
+ throw new InvalidRequestMappingException(id);
}
- requests.clear();
+ Request prev;
+
+ request.lock.lock();
- l = timeouts.size();
+ try {
+ prev = requests.remove(id);
- if (l > 0) {
- log.warn("Purging " + l + " timeouts(s)");
+ if (log.isTraceEnabled()) {
+ log.trace("Removed: {}", prev);
+ }
+ else {
+ log.debug("Removed: {}", id);
+ }
+ }
+ finally {
+ request.lock.unlock();
}
- timeouts.clear();
+ return prev;
}
//
@@ -124,86 +139,107 @@
public void schedule(final Request request) {
assert request != null;
- Message.ID id = request.getId();
+ request.lock.lock();
- if (timeouts.containsKey(request)) {
- throw new DuplicateRequestException(id);
- }
+ try {
+ Message.ID id = request.getId();
- if (request != get(id)) {
- throw new InvalidRequestMappingException(id);
- }
+ if (timeouts.containsKey(request)) {
+ throw new DuplicateRequestException(id);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Scheduling: {}", request);
- }
- else {
- log.debug("Scheduling: {}", id);
- }
+ if (request != get(id)) {
+ throw new InvalidRequestMappingException(id);
+ }
+
+ TimeoutTask task = new TimeoutTask(request);
- TimeoutTask task = new TimeoutTask(request);
+ ScheduledFuture<?> tf = scheduler.schedule(task, request.getTimeout(), request.getTimeoutUnit());
- ScheduledFuture<?> tf = scheduler.schedule(task, request.getTimeout(), request.getTimeoutUnit());
+ task.setTimeoutFuture(tf);
- task.setTimeoutFuture(tf);
+ timeouts.put(request, task);
- timeouts.put(request, task);
+ if (log.isTraceEnabled()) {
+ log.trace("Scheduled: {}", request);
+ }
+ else {
+ log.debug("Scheduled: {}", id);
+ }
+ }
+ finally {
+ request.lock.unlock();
+ }
}
public void cancel(final Request request) {
assert request != null;
- Message.ID id = request.getId();
+ request.lock.lock();
- TimeoutTask task = timeouts.remove(request);
+ try {
+ Message.ID id = request.getId();
- if (task == null) {
- throw new MissingRequestTimeoutException(id);
- }
+ TimeoutTask task = timeouts.remove(request);
- if (remove(id) != request) {
- throw new InvalidRequestMappingException(id);
- }
+ if (task == null) {
+ throw new MissingRequestTimeoutException(id);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Canceling: {}", request);
- }
- else {
- log.debug("Canceling: {}", id);
- }
+ if (remove(id) != request) {
+ throw new InvalidRequestMappingException(id);
+ }
- ScheduledFuture<?> sf = task.getTimeoutFuture();
+ if (log.isTraceEnabled()) {
+ log.trace("Canceling: {}", request);
+ }
+ else {
+ log.debug("Canceling: {}", id);
+ }
- if (sf != null) {
- sf.cancel(false);
+ ScheduledFuture<?> sf = task.getTimeoutFuture();
+
+ if (sf != null) {
+ sf.cancel(false);
+ }
+ }
+ finally {
+ request.lock.unlock();
}
}
private void timeout(final Request request) {
assert request != null;
- Message.ID id = request.getId();
+ request.lock.lock();
- if (log.isTraceEnabled()) {
- log.trace("Triggering: {}", request);
- }
- else {
- log.debug("Triggering: {}", id);
- }
+ try {
+ Message.ID id = request.getId();
- TimeoutTask task = timeouts.remove(request);
-
- if (task == null) {
- throw new MissingRequestTimeoutException(id);
- }
-
- if (remove(id) != request) {
- throw new InvalidRequestMappingException(id);
+ if (log.isTraceEnabled()) {
+ log.trace("Triggering: {}", request);
+ }
+ else {
+ log.debug("Triggering: {}", id);
+ }
+
+ TimeoutTask task = timeouts.remove(request);
+
+ if (task == null) {
+ throw new MissingRequestTimeoutException(id);
+ }
+
+ if (remove(id) != request) {
+ throw new InvalidRequestMappingException(id);
+ }
+
+ // If the request has not been signaled, then its a timeout :-(
+ if (!request.isSignaled()) {
+ request.timeout();
+ }
}
-
- // If the request has not been signaled, then its a timeout :-(
- if (!request.isSignaled()) {
- request.timeout();
+ finally {
+ request.lock.unlock();
}
}
@@ -215,6 +251,26 @@
for (Request request : requests) {
timeout(request);
}
+ }
+
+ public void clear() {
+ int l;
+
+ l = requests.size();
+
+ if (l > 0) {
+ log.warn("Purging " + l + " request(s)");
+ }
+
+ requests.clear();
+
+ l = timeouts.size();
+
+ if (l > 0) {
+ log.warn("Purging " + l + " timeouts(s)");
+ }
+
+ timeouts.clear();
}
public void close() {