You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jd...@apache.org on 2007/09/22 16:24:24 UTC

svn commit: r578446 - in /geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request: Request.java RequestManager.java

Author: jdillon
Date: Sat Sep 22 07:24:23 2007
New Revision: 578446

URL: http://svn.apache.org/viewvc?rev=578446&view=rev
Log:
Changed the request mutex to a lock
Update the request manager to lock on the request its process... still broken though... :-(

Modified:
    geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java
    geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java?rev=578446&r1=578445&r2=578446&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/Request.java Sat Sep 22 07:24:23 2007
@@ -26,6 +26,8 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.geronimo.gshell.common.tostring.ReflectionToStringBuilder;
 import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
@@ -42,7 +44,7 @@
 {
     private transient final Logger log = LoggerFactory.getLogger(getClass());
 
-    private transient final Object mutex = new Object();
+    final Lock lock = new ReentrantLock();
 
     private final BlockingQueue<Object> responses = new LinkedBlockingQueue<Object>();
 
@@ -167,10 +169,6 @@
         }
     }
 
-    Object getMutex() {
-        return mutex;
-    }
-
     private void queueResponse(final Object answer) {
         signaled = true;
 
@@ -180,7 +178,9 @@
     void signal(final Response response) {
         assert response != null;
 
-        synchronized (mutex) {
+        lock.lock();
+
+        try {
             if (log.isTraceEnabled()) {
                 log.debug("Signal response: {}", response);
             }
@@ -194,21 +194,34 @@
                 endOfResponses = true;
             }
         }
+        finally {
+            lock.unlock();
+        }
     }
 
     void timeout() {
-        synchronized (mutex) {
+        lock.lock();
+
+        try {
             log.debug("Timeout");
 
             queueResponse(RequestTimeoutException.class);
 
             endOfResponses = true;
         }
+        finally {
+            lock.unlock();
+        }
     }
 
     boolean isSignaled() {
-        synchronized (mutex) {
+        lock.lock();
+
+        try {
             return signaled;
+        }
+        finally {
+            lock.unlock();
         }
     }
 }

Modified: geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java?rev=578446&r1=578445&r2=578446&view=diff
==============================================================================
--- geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java (original)
+++ geronimo/sandbox/gshell/trunk/gshell-remote/gshell-remote-common/src/main/java/org/apache/geronimo/gshell/remote/request/RequestManager.java Sat Sep 22 07:24:23 2007
@@ -25,6 +25,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.geronimo.gshell.remote.message.Message;
 import org.apache.geronimo.gshell.remote.session.SessionAttributeBinder;
@@ -40,7 +41,9 @@
 {
     public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
 
-    private transient final Logger log = LoggerFactory.getLogger(getClass());
+    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong(0);
+
+    private transient final Logger log = LoggerFactory.getLogger(getClass() + "-" + INSTANCE_COUNTER.getAndIncrement());
 
     private transient final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1);
 
@@ -48,10 +51,6 @@
 
     private final Map<Request,TimeoutTask> timeouts = Collections.synchronizedMap(new HashMap<Request,TimeoutTask>());
     
-    //
-    // TODO: Lock on Request.getMutex(), and/or change the mutex to a read/write lock?
-    //
-
     public boolean contains(final Message.ID id) {
         assert id != null;
 
@@ -61,26 +60,40 @@
     public boolean contains(final Request request) {
         assert request != null;
 
-        return contains(request.getId());
+        request.lock.lock();
+
+        try {
+            return contains(request.getId());
+        }
+        finally {
+            request.lock.unlock();
+        }
     }
 
     public void add(final Request request) {
         assert request != null;
 
-        Message.ID id = request.getId();
+        request.lock.lock();
 
-        if (contains(request)) {
-            throw new DuplicateRequestException(id);
-        }
+        try {
+            Message.ID id = request.getId();
 
-        if (log.isTraceEnabled()) {
-            log.trace("Adding: {}", request);
+            if (contains(request)) {
+                throw new DuplicateRequestException(id);
+            }
+
+            requests.put(id, request);
+
+            if (log.isTraceEnabled()) {
+                log.trace("Added: {}", request);
+            }
+            else {
+                log.debug("Added: {}", id);
+            }
         }
-        else {
-            log.debug("Adding: {}", id);
+        finally {
+            request.lock.unlock();
         }
-
-        requests.put(id, request);
     }
 
     public Request get(final Message.ID id) {
@@ -92,29 +105,31 @@
     public Request remove(final Message.ID id) {
         assert id != null;
 
-        log.debug("Removing: {}", id);
+        Request request = get(id);
 
-        return requests.remove(id);
-    }
-
-    public void clear() {
-        int l;
-
-        l = requests.size();
-
-        if (l > 0) {
-            log.warn("Purging " + l + " request(s)");
+        if (request == null) {
+            throw new InvalidRequestMappingException(id);
         }
 
-        requests.clear();
+        Request prev;
+        
+        request.lock.lock();
 
-        l = timeouts.size();
+        try {
+            prev = requests.remove(id);
 
-        if (l > 0) {
-            log.warn("Purging " + l + " timeouts(s)");
+            if (log.isTraceEnabled()) {
+                log.trace("Removed: {}", prev);
+            }
+            else {
+                log.debug("Removed: {}", id);
+            }
+        }
+        finally {
+            request.lock.unlock();
         }
 
-        timeouts.clear();
+        return prev;
     }
 
     //
@@ -124,86 +139,107 @@
     public void schedule(final Request request) {
         assert request != null;
 
-        Message.ID id = request.getId();
+        request.lock.lock();
 
-        if (timeouts.containsKey(request)) {
-            throw new DuplicateRequestException(id);
-        }
+        try {
+            Message.ID id = request.getId();
 
-        if (request != get(id)) {
-            throw new InvalidRequestMappingException(id);
-        }
+            if (timeouts.containsKey(request)) {
+                throw new DuplicateRequestException(id);
+            }
 
-        if (log.isTraceEnabled()) {
-            log.trace("Scheduling: {}", request);
-        }
-        else {
-            log.debug("Scheduling: {}", id);
-        }
+            if (request != get(id)) {
+                throw new InvalidRequestMappingException(id);
+            }
+
+            TimeoutTask task = new TimeoutTask(request);
 
-        TimeoutTask task = new TimeoutTask(request);
+            ScheduledFuture<?> tf = scheduler.schedule(task, request.getTimeout(), request.getTimeoutUnit());
 
-        ScheduledFuture<?> tf = scheduler.schedule(task, request.getTimeout(), request.getTimeoutUnit());
+            task.setTimeoutFuture(tf);
 
-        task.setTimeoutFuture(tf);
+            timeouts.put(request, task);
 
-        timeouts.put(request, task);
+            if (log.isTraceEnabled()) {
+                log.trace("Scheduled: {}", request);
+            }
+            else {
+                log.debug("Scheduled: {}", id);
+            }
+        }
+        finally {
+            request.lock.unlock();
+        }
     }
 
     public void cancel(final Request request) {
         assert request != null;
 
-        Message.ID id = request.getId();
+        request.lock.lock();
 
-        TimeoutTask task = timeouts.remove(request);
+        try {
+            Message.ID id = request.getId();
 
-        if (task == null) {
-            throw new MissingRequestTimeoutException(id);
-        }
+            TimeoutTask task = timeouts.remove(request);
 
-        if (remove(id) != request) {
-            throw new InvalidRequestMappingException(id);
-        }
+            if (task == null) {
+                throw new MissingRequestTimeoutException(id);
+            }
 
-        if (log.isTraceEnabled()) {
-            log.trace("Canceling: {}", request);
-        }
-        else {
-            log.debug("Canceling: {}", id);
-        }
+            if (remove(id) != request) {
+                throw new InvalidRequestMappingException(id);
+            }
 
-        ScheduledFuture<?> sf = task.getTimeoutFuture();
+            if (log.isTraceEnabled()) {
+                log.trace("Canceling: {}", request);
+            }
+            else {
+                log.debug("Canceling: {}", id);
+            }
 
-        if (sf != null) {
-            sf.cancel(false);
+            ScheduledFuture<?> sf = task.getTimeoutFuture();
+
+            if (sf != null) {
+                sf.cancel(false);
+            }
+        }
+        finally {
+            request.lock.unlock();
         }
     }
 
     private void timeout(final Request request) {
         assert request != null;
 
-        Message.ID id = request.getId();
+        request.lock.lock();
 
-        if (log.isTraceEnabled()) {
-            log.trace("Triggering: {}", request);
-        }
-        else {
-            log.debug("Triggering: {}", id);
-        }
+        try {
+            Message.ID id = request.getId();
 
-        TimeoutTask task = timeouts.remove(request);
-
-        if (task == null) {
-            throw new MissingRequestTimeoutException(id);
-        }
-
-        if (remove(id) != request) {
-            throw new InvalidRequestMappingException(id);
+            if (log.isTraceEnabled()) {
+                log.trace("Triggering: {}", request);
+            }
+            else {
+                log.debug("Triggering: {}", id);
+            }
+
+            TimeoutTask task = timeouts.remove(request);
+
+            if (task == null) {
+                throw new MissingRequestTimeoutException(id);
+            }
+
+            if (remove(id) != request) {
+                throw new InvalidRequestMappingException(id);
+            }
+
+            // If the request has not been signaled, then its a timeout :-(
+            if (!request.isSignaled()) {
+                request.timeout();
+            }
         }
-
-        // If the request has not been signaled, then its a timeout :-(
-        if (!request.isSignaled()) {
-            request.timeout();
+        finally {
+            request.lock.unlock();
         }
     }
 
@@ -215,6 +251,26 @@
         for (Request request : requests) {
             timeout(request);
         }
+    }
+
+    public void clear() {
+        int l;
+
+        l = requests.size();
+
+        if (l > 0) {
+            log.warn("Purging " + l + " request(s)");
+        }
+
+        requests.clear();
+
+        l = timeouts.size();
+
+        if (l > 0) {
+            log.warn("Purging " + l + " timeouts(s)");
+        }
+
+        timeouts.clear();
     }
 
     public void close() {