You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/06/20 08:05:02 UTC

svn commit: r548946 - in /mina/trunk/core/src: main/java/org/apache/mina/filter/reqres/Request.java main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java

Author: trustin
Date: Tue Jun 19 23:05:02 2007
New Revision: 548946

URL: http://svn.apache.org/viewvc?view=rev&rev=548946
Log:
* Changed RequestResponseFilter to use ScheduledExecutorService for processing timeed-out messages
* Fixed memory leak in RequestResponseFilter

Modified:
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
    mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java Tue Jun 19 23:05:02 2007
@@ -20,9 +20,9 @@
 package org.apache.mina.filter.reqres;
 
 import java.util.NoSuchElementException;
-import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -35,7 +35,8 @@
     private final Object message;
     private final boolean useResponseQueue;
     private final long timeoutMillis;
-    private volatile TimerTask timerTask;
+    private volatile Runnable timeoutTask;
+    private volatile ScheduledFuture timeoutFuture;
 
     private final BlockingQueue<Object> responses = new LinkedBlockingQueue<Object>();
     private volatile boolean endOfResponses;
@@ -194,11 +195,19 @@
                ", timeout=" + timeout + ", message=" + getMessage() + " }";
     }
     
-    TimerTask getTimerTask() {
-        return timerTask;
+    Runnable getTimeoutTask() {
+        return timeoutTask;
     }
     
-    void setTimerTask(TimerTask timerTask) {
-        this.timerTask = timerTask;
+    void setTimeoutTask(Runnable timeoutTask) {
+        this.timeoutTask = timeoutTask;
+    }
+    
+    ScheduledFuture getTimeoutFuture() {
+        return timeoutFuture;
+    }
+    
+    void setTimeoutFuture(ScheduledFuture timeoutFuture) {
+        this.timeoutFuture = timeoutFuture;
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java Tue Jun 19 23:05:02 2007
@@ -20,15 +20,16 @@
 package org.apache.mina.filter.reqres;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Date;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoFilterChain;
@@ -46,29 +47,35 @@
 
     private static final String RESPONSE_INSPECTOR = RequestResponseFilter.class.getName() + ".responseInspector";
     private static final String REQUEST_STORE = RequestResponseFilter.class.getName() + ".requestStore";
-    private static final String UNFINISHED_TASKS = RequestResponseFilter.class.getName() + ".unfinishedTasks";
+    private static final String UNRESPONDED_REQUESTS = RequestResponseFilter.class.getName() + ".unrespondedRequests";
 
-    private static int timerId = 0;
-    
     private final ResponseInspectorFactory responseInspectorFactory;
-    private final Timer timer = new Timer("RequestTimer-" + (timerId++), true);
+    private final ScheduledExecutorService timeoutScheduler;
     
-    public RequestResponseFilter(final ResponseInspector responseInspector) {
+    public RequestResponseFilter(final ResponseInspector responseInspector, ScheduledExecutorService timeoutScheduler) {
         if (responseInspector == null) {
             throw new NullPointerException("responseInspector");
         }
+        if (timeoutScheduler == null) {
+            throw new NullPointerException("timeoutScheduler");
+        }
         this.responseInspectorFactory = new ResponseInspectorFactory() {
             public ResponseInspector getResponseInspector() {
                 return responseInspector;
             }
         };
+        this.timeoutScheduler = timeoutScheduler;
     }
     
-    public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory) {
+    public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory, ScheduledExecutorService timeoutScheduler) {
         if (responseInspectorFactory == null) {
             throw new NullPointerException("responseInspectorFactory");
         }
+        if (timeoutScheduler == null) {
+            throw new NullPointerException("timeoutScheduler");
+        }
         this.responseInspectorFactory = responseInspectorFactory;
+        this.timeoutScheduler = timeoutScheduler;
     }
     
     @Override
@@ -76,25 +83,17 @@
         IoSession session = parent.getSession();
         session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory.getResponseInspector());
         session.setAttribute(REQUEST_STORE, new ConcurrentHashMap<Object, Request>());
-        session.setAttribute(UNFINISHED_TASKS, new LinkedHashSet<TimerTask>());
+        session.setAttribute(UNRESPONDED_REQUESTS, new LinkedHashSet<Request>());
     }
     
     @Override
     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
         IoSession session = parent.getSession();
-        session.removeAttribute(UNFINISHED_TASKS);
+        session.removeAttribute(UNRESPONDED_REQUESTS);
         session.removeAttribute(REQUEST_STORE);
         session.removeAttribute(RESPONSE_INSPECTOR);
     }
     
-    /**
-     * Stops the timer thread this filter is using for processing request timeout.
-     */
-    @Override
-    public void destroy() {
-        timer.cancel();
-    }
-
     @Override
     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
         ResponseInspector responseInspector = (ResponseInspector) session.getAttribute(RESPONSE_INSPECTOR);
@@ -133,8 +132,8 @@
         if (request == null) {
             // A response message without request. Swallow the event because
             // the response might have arrived too late.
-            if (SessionLog.isDebugEnabled(session)) {
-                SessionLog.debug(
+            if (SessionLog.isWarnEnabled(session)) {
+                SessionLog.warn(
                         session,
                         "Unknown request ID '" + requestId + 
                         "' for the response message. Timed out already?: " + message);
@@ -143,9 +142,13 @@
             // Found a matching request.
             // Cancel the timeout task if needed.
             if (type != ResponseType.PARTIAL) {
-                TimerTask task = request.getTimerTask();
-                if (task != null) {
-                    task.cancel();
+                ScheduledFuture scheduledFuture = request.getTimeoutFuture();
+                if (scheduledFuture != null) {
+                    scheduledFuture.cancel(false);
+                    Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+                    synchronized (unrespondedRequests) {
+                        unrespondedRequests.remove(request);
+                    }
                 }
             }
 
@@ -165,7 +168,7 @@
         }
         
         Request request = (Request) message;
-        if (request.getTimerTask() != null) {
+        if (request.getTimeoutFuture() != null) {
             nextFilter.exceptionCaught(
                     session,
                     new IllegalArgumentException("Request can not be reused."));
@@ -199,17 +202,19 @@
             }
             
             TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
-            request.setTimerTask(timeoutTask);
+            
+            // Schedule the timeout task.
+            ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
+                    timeoutTask, request.getTimeoutMillis(), TimeUnit.MILLISECONDS);
+            request.setTimeoutTask(timeoutTask);
+            request.setTimeoutFuture(timeoutFuture);
             
             // Add the timtoue task to the unfinished task set.
-            Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
-            synchronized (unfinishedTasks) {
-                unfinishedTasks.add(timeoutTask);
+            Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+            synchronized (unrespondedRequests) {
+                unrespondedRequests.add(request);
             }
             
-            // Schedule the timeout task.
-            timer.schedule(timeoutTask, timeoutDate);
-            
             // and forward the original write request.
             nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
         } else {
@@ -221,17 +226,17 @@
     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
         // Copy the unifished task set to avoid unnecessary lock acquisition.
         // Copying will be cheap because there won't be that many requests queued.
-        Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
-        Collection<TimerTask> unfinishedTasksCopy;
-        synchronized (unfinishedTasks) {
-            unfinishedTasksCopy = new ArrayList<TimerTask>(unfinishedTasks);
-            unfinishedTasks.clear();
+        Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+        List<Request> unrespondedRequestsCopy;
+        synchronized (unrespondedRequests) {
+            unrespondedRequestsCopy = new ArrayList<Request>(unrespondedRequests);
+            unrespondedRequests.clear();
         }
         
         // Generate timeout artifically.
-        for (TimerTask task: unfinishedTasksCopy) {
-            if (task.cancel()) {
-                task.run();
+        for (Request r: unrespondedRequestsCopy) {
+            if (r.getTimeoutFuture().cancel(false)) {
+                r.getTimeoutTask().run();
             }
         }
 
@@ -248,11 +253,11 @@
     }
     
     @SuppressWarnings("unchecked")
-    private Set<TimerTask> getUnfinishedTasks(IoSession session) {
-        return (Set<TimerTask>) session.getAttribute(UNFINISHED_TASKS);
+    private Set<Request> getUnrespondedRequests(IoSession session) {
+        return (Set<Request>) session.getAttribute(UNRESPONDED_REQUESTS);
     }
     
-    private class TimeoutTask extends TimerTask {
+    private class TimeoutTask implements Runnable {
         private final NextFilter filter;
         private final Request request;
         private final IoSession session;
@@ -263,12 +268,11 @@
             this.session = session;
         }
 
-        @Override
         public void run() {
-            Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
-            if (unfinishedTasks != null) {
-                synchronized (unfinishedTasks) {
-                    unfinishedTasks.remove(this);
+            Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+            if (unrespondedRequests != null) {
+                synchronized (unrespondedRequests) {
+                    unrespondedRequests.remove(request);
                 }
             }
         

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java Tue Jun 19 23:05:02 2007
@@ -21,6 +21,8 @@
 
 import java.net.SocketAddress;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.mina.common.DefaultWriteRequest;
 import org.apache.mina.common.IoFilterChain;
@@ -51,6 +53,7 @@
  */
 public class RequestResponseFilterTest extends TestCase {
     
+    private ScheduledExecutorService scheduler;
     private RequestResponseFilter filter;
     private IoSession session;
     
@@ -62,8 +65,9 @@
     
     @Before
     public void setUp() throws Exception {
+        scheduler = Executors.newScheduledThreadPool(1);
         session = new DummySession();
-        filter = new RequestResponseFilter(new MessageInspector());
+        filter = new RequestResponseFilter(new MessageInspector(), scheduler);
         
         // Set up mock objects.
         chain = new AbstractIoFilterChain(session) {
@@ -90,10 +94,11 @@
         filter.onPreRemove(chain, "reqres", nextFilter);
         filter.onPostRemove(chain, "reqres", nextFilter);
         session.removeAttribute(SessionLog.LOGGER);
+        session.removeAttribute(SessionLog.PREFIX);
         Assert.assertTrue(session.getAttributeKeys().isEmpty());
-
         filter.destroy();
         filter = null;
+        scheduler.shutdown();
     }
     
     @Test