You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/06/20 08:05:02 UTC
svn commit: r548946 - in /mina/trunk/core/src:
main/java/org/apache/mina/filter/reqres/Request.java
main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
Author: trustin
Date: Tue Jun 19 23:05:02 2007
New Revision: 548946
URL: http://svn.apache.org/viewvc?view=rev&rev=548946
Log:
* Changed RequestResponseFilter to use ScheduledExecutorService for processing timeed-out messages
* Fixed memory leak in RequestResponseFilter
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java Tue Jun 19 23:05:02 2007
@@ -20,9 +20,9 @@
package org.apache.mina.filter.reqres;
import java.util.NoSuchElementException;
-import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -35,7 +35,8 @@
private final Object message;
private final boolean useResponseQueue;
private final long timeoutMillis;
- private volatile TimerTask timerTask;
+ private volatile Runnable timeoutTask;
+ private volatile ScheduledFuture timeoutFuture;
private final BlockingQueue<Object> responses = new LinkedBlockingQueue<Object>();
private volatile boolean endOfResponses;
@@ -194,11 +195,19 @@
", timeout=" + timeout + ", message=" + getMessage() + " }";
}
- TimerTask getTimerTask() {
- return timerTask;
+ Runnable getTimeoutTask() {
+ return timeoutTask;
}
- void setTimerTask(TimerTask timerTask) {
- this.timerTask = timerTask;
+ void setTimeoutTask(Runnable timeoutTask) {
+ this.timeoutTask = timeoutTask;
+ }
+
+ ScheduledFuture getTimeoutFuture() {
+ return timeoutFuture;
+ }
+
+ void setTimeoutFuture(ScheduledFuture timeoutFuture) {
+ this.timeoutFuture = timeoutFuture;
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java Tue Jun 19 23:05:02 2007
@@ -20,15 +20,16 @@
package org.apache.mina.filter.reqres;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
@@ -46,29 +47,35 @@
private static final String RESPONSE_INSPECTOR = RequestResponseFilter.class.getName() + ".responseInspector";
private static final String REQUEST_STORE = RequestResponseFilter.class.getName() + ".requestStore";
- private static final String UNFINISHED_TASKS = RequestResponseFilter.class.getName() + ".unfinishedTasks";
+ private static final String UNRESPONDED_REQUESTS = RequestResponseFilter.class.getName() + ".unrespondedRequests";
- private static int timerId = 0;
-
private final ResponseInspectorFactory responseInspectorFactory;
- private final Timer timer = new Timer("RequestTimer-" + (timerId++), true);
+ private final ScheduledExecutorService timeoutScheduler;
- public RequestResponseFilter(final ResponseInspector responseInspector) {
+ public RequestResponseFilter(final ResponseInspector responseInspector, ScheduledExecutorService timeoutScheduler) {
if (responseInspector == null) {
throw new NullPointerException("responseInspector");
}
+ if (timeoutScheduler == null) {
+ throw new NullPointerException("timeoutScheduler");
+ }
this.responseInspectorFactory = new ResponseInspectorFactory() {
public ResponseInspector getResponseInspector() {
return responseInspector;
}
};
+ this.timeoutScheduler = timeoutScheduler;
}
- public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory) {
+ public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory, ScheduledExecutorService timeoutScheduler) {
if (responseInspectorFactory == null) {
throw new NullPointerException("responseInspectorFactory");
}
+ if (timeoutScheduler == null) {
+ throw new NullPointerException("timeoutScheduler");
+ }
this.responseInspectorFactory = responseInspectorFactory;
+ this.timeoutScheduler = timeoutScheduler;
}
@Override
@@ -76,25 +83,17 @@
IoSession session = parent.getSession();
session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory.getResponseInspector());
session.setAttribute(REQUEST_STORE, new ConcurrentHashMap<Object, Request>());
- session.setAttribute(UNFINISHED_TASKS, new LinkedHashSet<TimerTask>());
+ session.setAttribute(UNRESPONDED_REQUESTS, new LinkedHashSet<Request>());
}
@Override
public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
IoSession session = parent.getSession();
- session.removeAttribute(UNFINISHED_TASKS);
+ session.removeAttribute(UNRESPONDED_REQUESTS);
session.removeAttribute(REQUEST_STORE);
session.removeAttribute(RESPONSE_INSPECTOR);
}
- /**
- * Stops the timer thread this filter is using for processing request timeout.
- */
- @Override
- public void destroy() {
- timer.cancel();
- }
-
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
ResponseInspector responseInspector = (ResponseInspector) session.getAttribute(RESPONSE_INSPECTOR);
@@ -133,8 +132,8 @@
if (request == null) {
// A response message without request. Swallow the event because
// the response might have arrived too late.
- if (SessionLog.isDebugEnabled(session)) {
- SessionLog.debug(
+ if (SessionLog.isWarnEnabled(session)) {
+ SessionLog.warn(
session,
"Unknown request ID '" + requestId +
"' for the response message. Timed out already?: " + message);
@@ -143,9 +142,13 @@
// Found a matching request.
// Cancel the timeout task if needed.
if (type != ResponseType.PARTIAL) {
- TimerTask task = request.getTimerTask();
- if (task != null) {
- task.cancel();
+ ScheduledFuture scheduledFuture = request.getTimeoutFuture();
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+ synchronized (unrespondedRequests) {
+ unrespondedRequests.remove(request);
+ }
}
}
@@ -165,7 +168,7 @@
}
Request request = (Request) message;
- if (request.getTimerTask() != null) {
+ if (request.getTimeoutFuture() != null) {
nextFilter.exceptionCaught(
session,
new IllegalArgumentException("Request can not be reused."));
@@ -199,17 +202,19 @@
}
TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
- request.setTimerTask(timeoutTask);
+
+ // Schedule the timeout task.
+ ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
+ timeoutTask, request.getTimeoutMillis(), TimeUnit.MILLISECONDS);
+ request.setTimeoutTask(timeoutTask);
+ request.setTimeoutFuture(timeoutFuture);
// Add the timtoue task to the unfinished task set.
- Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
- synchronized (unfinishedTasks) {
- unfinishedTasks.add(timeoutTask);
+ Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+ synchronized (unrespondedRequests) {
+ unrespondedRequests.add(request);
}
- // Schedule the timeout task.
- timer.schedule(timeoutTask, timeoutDate);
-
// and forward the original write request.
nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
} else {
@@ -221,17 +226,17 @@
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
// Copy the unifished task set to avoid unnecessary lock acquisition.
// Copying will be cheap because there won't be that many requests queued.
- Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
- Collection<TimerTask> unfinishedTasksCopy;
- synchronized (unfinishedTasks) {
- unfinishedTasksCopy = new ArrayList<TimerTask>(unfinishedTasks);
- unfinishedTasks.clear();
+ Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+ List<Request> unrespondedRequestsCopy;
+ synchronized (unrespondedRequests) {
+ unrespondedRequestsCopy = new ArrayList<Request>(unrespondedRequests);
+ unrespondedRequests.clear();
}
// Generate timeout artifically.
- for (TimerTask task: unfinishedTasksCopy) {
- if (task.cancel()) {
- task.run();
+ for (Request r: unrespondedRequestsCopy) {
+ if (r.getTimeoutFuture().cancel(false)) {
+ r.getTimeoutTask().run();
}
}
@@ -248,11 +253,11 @@
}
@SuppressWarnings("unchecked")
- private Set<TimerTask> getUnfinishedTasks(IoSession session) {
- return (Set<TimerTask>) session.getAttribute(UNFINISHED_TASKS);
+ private Set<Request> getUnrespondedRequests(IoSession session) {
+ return (Set<Request>) session.getAttribute(UNRESPONDED_REQUESTS);
}
- private class TimeoutTask extends TimerTask {
+ private class TimeoutTask implements Runnable {
private final NextFilter filter;
private final Request request;
private final IoSession session;
@@ -263,12 +268,11 @@
this.session = session;
}
- @Override
public void run() {
- Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
- if (unfinishedTasks != null) {
- synchronized (unfinishedTasks) {
- unfinishedTasks.remove(this);
+ Set<Request> unrespondedRequests = getUnrespondedRequests(session);
+ if (unrespondedRequests != null) {
+ synchronized (unrespondedRequests) {
+ unrespondedRequests.remove(request);
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java?view=diff&rev=548946&r1=548945&r2=548946
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java Tue Jun 19 23:05:02 2007
@@ -21,6 +21,8 @@
import java.net.SocketAddress;
import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.mina.common.DefaultWriteRequest;
import org.apache.mina.common.IoFilterChain;
@@ -51,6 +53,7 @@
*/
public class RequestResponseFilterTest extends TestCase {
+ private ScheduledExecutorService scheduler;
private RequestResponseFilter filter;
private IoSession session;
@@ -62,8 +65,9 @@
@Before
public void setUp() throws Exception {
+ scheduler = Executors.newScheduledThreadPool(1);
session = new DummySession();
- filter = new RequestResponseFilter(new MessageInspector());
+ filter = new RequestResponseFilter(new MessageInspector(), scheduler);
// Set up mock objects.
chain = new AbstractIoFilterChain(session) {
@@ -90,10 +94,11 @@
filter.onPreRemove(chain, "reqres", nextFilter);
filter.onPostRemove(chain, "reqres", nextFilter);
session.removeAttribute(SessionLog.LOGGER);
+ session.removeAttribute(SessionLog.PREFIX);
Assert.assertTrue(session.getAttributeKeys().isEmpty());
-
filter.destroy();
filter = null;
+ scheduler.shutdown();
}
@Test