You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2013/06/14 11:16:17 UTC
[2/2] git commit: request, response filter,
produce IoFuture for waiting/listening the async reception of a
response to a request
request, response filter, produce IoFuture for waiting/listening the async reception of a response to a request
Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/a68de05c
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/a68de05c
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/a68de05c
Branch: refs/heads/trunk
Commit: a68de05cc12a20c2c6208cb33c0ac81b6f24325e
Parents: f7322f7
Author: jvermillard <jv...@apache.org>
Authored: Fri Jun 14 11:15:06 2013 +0200
Committer: jvermillard <jv...@apache.org>
Committed: Fri Jun 14 11:15:06 2013 +0200
----------------------------------------------------------------------
.../mina/api/AbstractIoFutureListener.java | 38 +++++
.../org/apache/mina/filter/query/Request.java | 28 ++++
.../apache/mina/filter/query/RequestFilter.java | 141 +++++++++++++++++++
.../apache/mina/filter/query/RequestFuture.java | 66 +++++++++
.../filter/query/RequestTimeoutException.java | 30 ++++
.../org/apache/mina/filter/query/Response.java | 34 +++++
.../apache/mina/session/DefaultWriteFuture.java | 4 +-
.../mina/filter/query/RequestFilterTest.java | 114 +++++++++++++++
8 files changed, 453 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java b/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
new file mode 100644
index 0000000..0757458
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/api/AbstractIoFutureListener.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.api;
+
+/**
+ *
+ * Convenient base implementation for {@link IoFutureListener}. if something wrong happen the exception is rethrown,
+ * which will produce an exception caught event for the session
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public abstract class AbstractIoFutureListener<V> implements IoFutureListener<V> {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void exception(Throwable t) {
+ throw new MinaRuntimeException(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/Request.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/Request.java b/core/src/main/java/org/apache/mina/filter/query/Request.java
new file mode 100644
index 0000000..ef9070b
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/Request.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ * A request message, which should be responded by a {@link Response}.
+ */
+public interface Request {
+
+ Object requestId();
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
new file mode 100644
index 0000000..963bea1
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFilter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.mina.api.AbstractIoFilter;
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.session.AttributeKey;
+
+/**
+ * A filter providing {@link IoFuture} for request/response protocol.
+ *
+ * You send a request to the connected end-point and a {@link IoFuture} is provided for handling the received request
+ * response.
+ *
+ * The filter find the received message matching the request, using {@link Request#requestId()} and
+ * {@link Response#requestId()}.
+ *
+ * <pre>
+ * RequestFilter rq = new RequestFilter();
+ *
+ * service.setFilters(.., rq);
+ *
+ * IoFuture<Response> future = rq.request(session, message, 10000);
+ *
+ * response.register(new AbstractIoFutureListener<Response>() {
+ * @Override
+ * public void completed(Response result) {
+ * System.err.println("request completed ! response : " + result);
+ * }
+ * });
+ * </pre>
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> extends AbstractIoFilter {
+
+ /**
+ *
+ * @param session
+ * @param request
+ * @param timeoutInMs
+ * @return
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public IoFuture<RESPONSE> request(IoSession session, REQUEST request, long timeoutInMs) {
+ Map inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+ IoFuture<RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, System.currentTimeMillis()
+ + timeoutInMs, request.requestId());
+
+ // save the future for completion
+ inFlight.put(request.requestId(), future);
+ session.write(request);
+ return future;
+ }
+
+ @SuppressWarnings("rawtypes")
+ static final AttributeKey<Map> IN_FLIGHT_REQUESTS = new AttributeKey<Map>(Map.class, "request.in.flight");
+
+ // last time we checked the timeouts
+ private long lastTimeoutCheck = 0;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void sessionOpened(IoSession session) {
+ session.setAttribute(IN_FLIGHT_REQUESTS, new ConcurrentHashMap());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
+ if (message instanceof Response) {
+ Object id = ((Response) message).requestId();
+ if (id != null) {
+ // got a response, let's find the query
+ Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+ RequestFuture<REQUEST, RESPONSE> future = (RequestFuture<REQUEST, RESPONSE>) inFlight.remove(id);
+ if (future != null) {
+ future.complete((RESPONSE) message);
+ }
+ }
+ }
+
+ // check for timeout
+ long now = System.currentTimeMillis();
+ if (lastTimeoutCheck + 1000 < now) {
+ lastTimeoutCheck = now;
+ Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+ for (Object v : inFlight.values()) {
+ ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
+ }
+ }
+ // trigger the next filter
+ super.messageReceived(session, message, controller);
+ }
+
+ @Override
+ public void messageSent(IoSession session, Object message) {
+ // check for timeout
+ long now = System.currentTimeMillis();
+ if (lastTimeoutCheck + 1000 < now) {
+ lastTimeoutCheck = now;
+ Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+ for (Object v : inFlight.values()) {
+ ((RequestFuture<?, ?>) v).timeoutIfNeeded(now);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc} cancel remaining requests
+ */
+ @Override
+ public void sessionClosed(IoSession session) {
+ Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
+ for (Object v : inFlight.values()) {
+ ((RequestFuture<?, ?>) v).cancel(true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
new file mode 100644
index 0000000..de8564f
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestFuture.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import java.util.Map;
+
+import org.apache.mina.api.IoSession;
+import org.apache.mina.util.AbstractIoFuture;
+
+/**
+ * A future representing the promise of a reply to a request.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ *
+ * @param <REQUEST> the request type
+ * @param <RESPONSE> the response type
+ */
+class RequestFuture<REQUEST extends Request, RESPONSE extends Response> extends AbstractIoFuture<RESPONSE> {
+
+ private final IoSession session;
+
+ private final long timeout;
+
+ private final Object id;
+
+ public RequestFuture(IoSession session, long timeout, Object id) {
+ this.session = session;
+ this.timeout = timeout;
+ this.id = id;
+ }
+
+ @Override
+ protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+ throw new IllegalStateException("not implemented");
+ }
+
+ void complete(RESPONSE response) {
+ setResult(response);
+ }
+
+ @SuppressWarnings("rawtypes")
+ void timeoutIfNeeded(long time) {
+ if (timeout < time) {
+ Map inFlight = session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+ inFlight.remove(id);
+ setException(new RequestTimeoutException());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java b/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
new file mode 100644
index 0000000..c6ec253
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/RequestTimeoutException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ * An exception occurring when the request took too much time.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+@SuppressWarnings("serial")
+public class RequestTimeoutException extends Exception {
+
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/filter/query/Response.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/filter/query/Response.java b/core/src/main/java/org/apache/mina/filter/query/Response.java
new file mode 100644
index 0000000..2b9fac2
--- /dev/null
+++ b/core/src/main/java/org/apache/mina/filter/query/Response.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+/**
+ *
+ * A message which can be possibly a response to a request.
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface Response {
+
+ /**
+ * @return the identifier of the associated request or <code>null</code> if none
+ */
+ public Object requestId();
+}
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java b/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
index 5f48354..0d8eaac 100644
--- a/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
+++ b/core/src/main/java/org/apache/mina/session/DefaultWriteFuture.java
@@ -24,14 +24,14 @@ import org.apache.mina.util.AbstractIoFuture;
/**
* The default implementation for the {@link IoFuture} returned by {@link IoSession#writeWithFuture(Object)}
- *
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class DefaultWriteFuture extends AbstractIoFuture<Void> {
@Override
protected boolean cancelOwner(boolean mayInterruptIfRunning) {
- throw new RuntimeException("not implemented");
+ throw new IllegalStateException("not implemented");
}
/**
http://git-wip-us.apache.org/repos/asf/mina/blob/a68de05c/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java b/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
new file mode 100644
index 0000000..588e021
--- /dev/null
+++ b/core/src/test/java/org/apache/mina/filter/query/RequestFilterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.query;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.Map;
+
+import org.apache.mina.api.IoFuture;
+import org.apache.mina.api.IoSession;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ * Unit test for {@link RequestFilter}
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class RequestFilterTest {
+
+ @SuppressWarnings("rawtypes")
+ private RequestFilter rq = new RequestFilter();
+
+ @Test
+ public void session_open_initialize_in_flight_container() {
+ IoSession session = mock(IoSession.class);
+
+ // run
+ rq.sessionOpened(session);
+
+ // verify
+ verify(session).setAttribute(same(RequestFilter.IN_FLIGHT_REQUESTS), any(Map.class));
+ verifyNoMoreInteractions(session);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void request_and_produce_a_future() {
+ IoSession session = mock(IoSession.class);
+
+ Request r = mock(Request.class);
+ when(r.requestId()).thenReturn("ID");
+
+ Map m = mock(Map.class);
+
+ when(session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS)).thenReturn(m);
+
+ // run
+ IoFuture f = rq.request(session, r, 12345);
+
+ // verify
+ Assert.assertFalse(f.isDone());
+ Assert.assertFalse(f.isCancelled());
+
+ verify(r, times(2)).requestId();
+ verify(session).write(r);
+ verify(m).put("ID", f);
+ verify(session).getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+ verifyNoMoreInteractions(session, m);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void receive_a_messagre_and_find_the_future_to_complete() {
+ IoSession session = mock(IoSession.class);
+
+ Response r = mock(Response.class);
+ when(r.requestId()).thenReturn("ID");
+
+ Map m = mock(Map.class);
+
+ when(session.getAttribute(RequestFilter.IN_FLIGHT_REQUESTS)).thenReturn(m);
+
+ RequestFuture f = mock(RequestFuture.class);
+
+ when(m.remove("ID")).thenReturn(f);
+
+ ReadFilterChainController ctl = mock(ReadFilterChainController.class);
+
+ // run
+ rq.messageReceived(session, r, ctl);
+
+ // verify
+ verify(session, times(2)).getAttribute(RequestFilter.IN_FLIGHT_REQUESTS);
+ verify(m).remove("ID");
+ verify(r).requestId();
+ verify(f).complete(r);
+
+ verify(m).values();
+
+ verify(ctl).callReadNextFilter(r);
+ verifyNoMoreInteractions(r, m, session, f, ctl);
+ }
+}