You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2013/06/18 21:53:35 UTC

[3/3] git commit: use a scheduled executor for request timeout

use a  scheduled executor for request timeout


Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/50d27d9e
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/50d27d9e
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/50d27d9e

Branch: refs/heads/trunk
Commit: 50d27d9e60838e9a4906ac797305e88247470f5c
Parents: a3a8bcc
Author: jvermillard <jv...@apache.org>
Authored: Tue Jun 18 21:52:47 2013 +0200
Committer: jvermillard <jv...@apache.org>
Committed: Tue Jun 18 21:52:47 2013 +0200

----------------------------------------------------------------------
 .../apache/mina/filter/query/RequestFilter.java | 37 +++++---------------
 .../apache/mina/filter/query/RequestFuture.java | 30 +++++++++++-----
 2 files changed, 30 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/50d27d9e/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
index 5539b89..77f8827 100644
--- a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
@@ -21,6 +21,9 @@ package org.apache.mina.filter.query;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.mina.api.AbstractIoFilter;
 import org.apache.mina.api.IoFuture;
@@ -63,13 +66,15 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e
      * @param session the session where to write the request
      * @param request the request to be issued
      * @param timeoutInMs the timeout in milli-seconds (doesn't work Work-in-progress).
-     * @return
+     * @return the {@link IoFuture} for waiting or listening the completion of this request.
      */
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public IoFuture<RESPONSE> request(IoSession session, REQUEST request, long timeoutInMs) {
         Map inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
-        IoFuture<RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, System.currentTimeMillis()
-                + timeoutInMs, request.requestId());
+        RequestFuture<REQUEST, RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, request.requestId());
+
+        // schedule a timeout task
+        future.setTimeoutFuture(schedExec.schedule(future.timeout, timeoutInMs, TimeUnit.MILLISECONDS));
 
         // save the future for completion
         inFlight.put(request.requestId(), future);
@@ -80,8 +85,7 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e
     @SuppressWarnings("rawtypes")
     static final AttributeKey<Map> IN_FLIGHT_REQUESTS = new AttributeKey<Map>(Map.class, "request.in.flight");
 
-    // last time we checked the timeouts
-    private long lastTimeoutCheck = 0;
+    private ScheduledExecutorService schedExec = Executors.newScheduledThreadPool(1);
 
     @SuppressWarnings("rawtypes")
     @Override
@@ -104,32 +108,9 @@ public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> e
             }
         }
 
-        // // check for timeout
-        // long now = System.currentTimeMillis();
-        // if (lastTimeoutCheck + 1000 < now) {
-        // lastTimeoutCheck = now;
-        // Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
-        // for (Object v : inFlight.values()) {
-        // ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
-        // }
-        // }
-        // trigger the next filter
         super.messageReceived(session, message, controller);
     }
 
-    @Override
-    public void messageSent(IoSession session, Object message) {
-        // check for timeout
-        // long now = System.currentTimeMillis();
-        // if (lastTimeoutCheck + 1000 < now) {
-        // lastTimeoutCheck = now;
-        // Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
-        // for (Object v : inFlight.values()) {
-        // ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
-        // }
-        // }
-    }
-
     /**
      * {@inheritDoc} cancel remaining requests
      */

http://git-wip-us.apache.org/repos/asf/mina/blob/50d27d9e/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
index de8564f..7b15443 100644
--- a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
@@ -20,6 +20,7 @@
 package org.apache.mina.filter.query;
 
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 
 import org.apache.mina.api.IoSession;
 import org.apache.mina.util.AbstractIoFuture;
@@ -36,13 +37,12 @@ class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends
 
     private final IoSession session;
 
-    private final long timeout;
-
     private final Object id;
 
-    public RequestFuture(IoSession session, long timeout, Object id) {
+    private ScheduledFuture<?> schedFuture;
+
+    public RequestFuture(IoSession session, Object id) {
         this.session = session;
-        this.timeout = timeout;
         this.id = id;
     }
 
@@ -52,15 +52,27 @@ class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends
     }
 
     void complete(RESPONSE response) {
+        if (schedFuture != null) {
+            schedFuture.cancel(true);
+        }
         setResult(response);
     }
 
-    @SuppressWarnings("rawtypes")
-    void timeoutIfNeeded(long time) {
-        if (timeout < time) {
+    void setTimeoutFuture(ScheduledFuture<?> schedFuture) {
+        this.schedFuture = schedFuture;
+    }
+
+    Runnable timeout = new Runnable() {
+
+        @SuppressWarnings("rawtypes")
+        @Override
+        public void run() {
             Map inFlight = session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
-            inFlight.remove(id);
+            if (inFlight != null) {
+                inFlight.remove(id);
+            }
             setException(new RequestTimeoutException());
+
         }
-    }
+    };
 }
\ No newline at end of file