You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2013/06/14 11:16:16 UTC

[1/2] git commit: utilities method for creating coap messages

Updated Branches:
  refs/heads/trunk efd141462 -> a68de05cc


utilities method for creating coap messages


Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/f7322f73
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/f7322f73
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/f7322f73

Branch: refs/heads/trunk
Commit: f7322f7370be5be08bf21c13377fb74d2c599d72
Parents: efd1414
Author: jvermillard <jv...@apache.org>
Authored: Fri Jun 14 11:13:42 2013 +0200
Committer: jvermillard <jv...@apache.org>
Committed: Fri Jun 14 11:13:42 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/mina/coap/CoapMessage.java  | 49 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/f7322f73/coap/src/main/java/org/apache/mina/coap/CoapMessage.java
----------------------------------------------------------------------
diff --git a/coap/src/main/java/org/apache/mina/coap/CoapMessage.java b/coap/src/main/java/org/apache/mina/coap/CoapMessage.java
index a366284..4bdafa1 100644
--- a/coap/src/main/java/org/apache/mina/coap/CoapMessage.java
+++ b/coap/src/main/java/org/apache/mina/coap/CoapMessage.java
@@ -19,15 +19,44 @@
  */
 package org.apache.mina.coap;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Comparator;
 
+import org.apache.mina.filter.query.Request;
+import org.apache.mina.filter.query.Response;
+
 /**
  * A representation of CoAP message following the CoAP RFC.
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public class CoapMessage {
+public class CoapMessage implements Request, Response {
+
+    public static final CoapMessage get(String url, boolean confimable) {
+
+        return new CoapMessage(1, confimable ? MessageType.CONFIRMABLE : MessageType.NON_CONFIRMABLE,
+                CoapCode.GET.getCode(), (int) (System.nanoTime() % 65536), null, optionsForUrl(url), null);
+    }
+
+    public static final CoapMessage post(String url, boolean confimable, byte[] payload) {
+
+        return new CoapMessage(1, confimable ? MessageType.CONFIRMABLE : MessageType.NON_CONFIRMABLE,
+                CoapCode.GET.getCode(), (int) (System.nanoTime() % 65536), null, optionsForUrl(url), payload);
+    }
+
+    private static final CoapOption[] optionsForUrl(String url) {
+        String[] paths = url.split("/");
+        CoapOption[] opt = new CoapOption[paths.length];
+        for (int i = 0; i < paths.length; i++) {
+            try {
+                opt[i] = new CoapOption(CoapOptionType.URI_PATH, paths[i].getBytes("UTF-8"));
+            } catch (UnsupportedEncodingException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+        return opt;
+    }
 
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
@@ -77,6 +106,14 @@ public class CoapMessage {
         });
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object requestId() {
+        return id;
+    }
+
     public int getVersion() {
         return version;
     }
@@ -112,8 +149,16 @@ public class CoapMessage {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
+        CoapCode c = CoapCode.fromCode(code);
+        String cstr;
+        if (c == null) {
+            cstr = String.valueOf(code);
+        } else {
+            cstr = c.getText();
+        }
+
         builder.append("CoapMessage [version=").append(version).append(", type=").append(type).append(", code=")
-                .append(code).append(", id=").append(id).append(", token=").append(Arrays.toString(token))
+                .append(cstr).append(", id=").append(id).append(", token=").append(Arrays.toString(token))
                 .append(", payload=").append(Arrays.toString(payload)).append(", options=")
                 .append(Arrays.toString(options)).append("]");
 


[2/2] git commit: request, response filter, produce IoFuture for waiting/listening the async reception of a response to a request

Posted by jv...@apache.org.
request, response filter, produce IoFuture for waiting/listening the async reception of a response to a request


Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/a68de05c
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/a68de05c
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/a68de05c

Branch: refs/heads/trunk
Commit: a68de05cc12a20c2c6208cb33c0ac81b6f24325e
Parents: f7322f7
Author: jvermillard <jv...@apache.org>
Authored: Fri Jun 14 11:15:06 2013 +0200
Committer: jvermillard <jv...@apache.org>
Committed: Fri Jun 14 11:15:06 2013 +0200

----------------------------------------------------------------------
 .../mina/api/AbstractIoFutureListener.java      |  38 +++++
 .../org/apache/mina/filter/query/Request.java   |  28 ++++
 .../apache/mina/filter/query/RequestFilter.java | 141 +++++++++++++++++++
 .../apache/mina/filter/query/RequestFuture.java |  66 +++++++++
 .../filter/query/RequestTimeoutException.java   |  30 ++++
 .../org/apache/mina/filter/query/Response.java  |  34 +++++
 .../apache/mina/session/DefaultWriteFuture.java |   4 +-
 .../mina/filter/query/RequestFilterTest.java    | 114 +++++++++++++++
 8 files changed, 453 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java b/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
new file mode 100644
index 0000000..0757458
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.api;
+
+/**
+ * 
+ * Convenient base implementation for {@link IoFutureListener}. if something wrong happen the exception is rethrown,
+ * which will produce an exception caught event for the session
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public abstract class AbstractIoFutureListener<V> implements IoFutureListener<V> {
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void exception(Throwable t) {
+        throw new MinaRuntimeException(t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/Request.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/Request.java b/core/src/main/java/org/apache/mina/filter/query/Request.java
new file mode 100644
index 0000000..ef9070b
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/Request.java
@@ -0,0 +1,28 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ * A request message, which should be responded by a {@link Response}.
+ */
+public interface Request {
+
+    Object requestId();
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
new file mode 100644
index 0000000..963bea1
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
@@ -0,0 +1,141 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.api.AbstractIoFilter;
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.session.AttributeKey;
+
+/**
+ * A filter providing {@link IoFuture} for request/response protocol.
+ * 
+ * You send a request to the connected end-point and a {@link IoFuture} is provided for handling the received request
+ * response.
+ * 
+ * The filter find the received message matching the request, using {@link Request#requestId()} and
+ * {@link Response#requestId()}.
+ * 
+ * <pre>
+ * RequestFilter rq = new RequestFilter();
+ * 
+ * service.setFilters(.., rq);
+ * 
+ * IoFuture&lt;Response&gt; future = rq.request(session, message, 10000);
+ * 
+ * response.register(new AbstractIoFutureListener&lt;Response&gt;() {
+ *     &#064;Override
+ *     public void completed(Response result) {
+ *         System.err.println(&quot;request completed ! response : &quot; + result);
+ *     }
+ * });
+ * </pre>
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> extends AbstractIoFilter {
+
+    /**
+     * 
+     * @param session
+     * @param request
+     * @param timeoutInMs
+     * @return
+     */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public IoFuture<RESPONSE> request(IoSession session, REQUEST request, long timeoutInMs) {
+        Map inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+        IoFuture<RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, System.currentTimeMillis()
+                + timeoutInMs, request.requestId());
+
+        // save the future for completion
+        inFlight.put(request.requestId(), future);
+        session.write(request);
+        return future;
+    }
+
+    @SuppressWarnings("rawtypes")
+    static final AttributeKey<Map> IN_FLIGHT_REQUESTS = new AttributeKey<Map>(Map.class, "request.in.flight");
+
+    // last time we checked the timeouts
+    private long lastTimeoutCheck = 0;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void sessionOpened(IoSession session) {
+        session.setAttribute(IN_FLIGHT_REQUESTS, new ConcurrentHashMap());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
+        if (message instanceof Response) {
+            Object id = ((Response) message).requestId();
+            if (id != null) {
+                // got a response, let's find the query
+                Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+                RequestFuture<REQUEST, RESPONSE> future = (RequestFuture<REQUEST, RESPONSE>) inFlight.remove(id);
+                if (future != null) {
+                    future.complete((RESPONSE) message);
+                }
+            }
+        }
+
+        // check for timeout
+        long now = System.currentTimeMillis();
+        if (lastTimeoutCheck + 1000 < now) {
+            lastTimeoutCheck = now;
+            Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+            for (Object v : inFlight.values()) {
+                ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
+            }
+        }
+        // trigger the next filter
+        super.messageReceived(session, message, controller);
+    }
+
+    @Override
+    public void messageSent(IoSession session, Object message) {
+        // check for timeout
+        long now = System.currentTimeMillis();
+        if (lastTimeoutCheck + 1000 < now) {
+            lastTimeoutCheck = now;
+            Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+            for (Object v : inFlight.values()) {
+                ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc} cancel remaining requests
+     */
+    @Override
+    public void sessionClosed(IoSession session) {
+        Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+        for (Object v : inFlight.values()) {
+            ((RequestFuture<?, ?>) v).cancel(true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
new file mode 100644
index 0000000..de8564f
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import java.util.Map;
+
+import org.apache.mina.api.IoSession;
+import org.apache.mina.util.AbstractIoFuture;
+
+/**
+ * A future representing the promise of a reply to a request.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ * 
+ * @param <REQUEST> the request type
+ * @param <RESPONSE> the response type
+ */
+class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends AbstractIoFuture<RESPONSE> {
+
+    private final IoSession session;
+
+    private final long timeout;
+
+    private final Object id;
+
+    public RequestFuture(IoSession session, long timeout, Object id) {
+        this.session = session;
+        this.timeout = timeout;
+        this.id = id;
+    }
+
+    @Override
+    protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+        throw new IllegalStateException("not implemented");
+    }
+
+    void complete(RESPONSE response) {
+        setResult(response);
+    }
+
+    @SuppressWarnings("rawtypes")
+    void timeoutIfNeeded(long time) {
+        if (timeout < time) {
+            Map inFlight = session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+            inFlight.remove(id);
+            setException(new RequestTimeoutException());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java b/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
new file mode 100644
index 0000000..c6ec253
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ * An exception occurring when the request took too much time.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+@SuppressWarnings("serial")
+public class RequestTimeoutException extends Exception {
+
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/Response.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/Response.java b/core/src/main/java/org/apache/mina/filter/query/Response.java
new file mode 100644
index 0000000..2b9fac2
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/Response.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ * 
+ * A message which can be possibly a response to a request.
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface Response {
+
+    /**
+     * @return the identifier of the associated request or <code>null</code> if none
+     */
+    public Object requestId();
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java b/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
index 5f48354..0d8eaac 100644
--- a/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
+++ b/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
@@ -24,14 +24,14 @@ import org.apache.mina.util.AbstractIoFuture;
 
 /**
  * The default implementation for the {@link IoFuture} returned by {@link IoSession#writeWithFuture(Object)}
- *
+ * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class DefaultWriteFuture extends AbstractIoFuture<Void> {
 
     @Override
     protected boolean cancelOwner(boolean mayInterruptIfRunning) {
-        throw new RuntimeException("not implemented");
+        throw new IllegalStateException("not implemented");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java b/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
new file mode 100644
index 0000000..588e021
--- /dev/null
+++ b/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Map;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ * Unit test for {@link RequestFilter}
+ * 
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class RequestFilterTest {
+
+    @SuppressWarnings("rawtypes")
+    private RequestFilter rq = new RequestFilter();
+
+    @Test
+    public void session_open_initialize_in_flight_container() {
+        IoSession session = mock(IoSession.class);
+
+        // run
+        rq.sessionOpened(session);
+
+        // verify
+        verify(session).setAttribute(same(RequestFilter.IN_FLIGHT_REQUESTS), any(Map.class));
+        verifyNoMoreInteractions(session);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void request_and_produce_a_future() {
+        IoSession session = mock(IoSession.class);
+
+        Request r = mock(Request.class);
+        when(r.requestId()).thenReturn("ID");
+
+        Map m = mock(Map.class);
+
+        when(session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS)).thenReturn(m);
+
+        // run
+        IoFuture f = rq.request(session, r, 12345);
+
+        // verify
+        Assert.assertFalse(f.isDone());
+        Assert.assertFalse(f.isCancelled());
+
+        verify(r, times(2)).requestId();
+        verify(session).write(r);
+        verify(m).put("ID", f);
+        verify(session).getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+        verifyNoMoreInteractions(session, m);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void receive_a_messagre_and_find_the_future_to_complete() {
+        IoSession session = mock(IoSession.class);
+
+        Response r = mock(Response.class);
+        when(r.requestId()).thenReturn("ID");
+
+        Map m = mock(Map.class);
+
+        when(session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS)).thenReturn(m);
+
+        RequestFuture f = mock(RequestFuture.class);
+
+        when(m.remove("ID")).thenReturn(f);
+
+        ReadFilterChainController ctl = mock(ReadFilterChainController.class);
+
+        // run
+        rq.messageReceived(session, r, ctl);
+
+        // verify
+        verify(session, times(2)).getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+        verify(m).remove("ID");
+        verify(r).requestId();
+        verify(f).complete(r);
+
+        verify(m).values();
+
+        verify(ctl).callReadNextFilter(r);
+        verifyNoMoreInteractions(r, m, session, f, ctl);
+    }
+}