You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/04/12 10:40:52 UTC
svn commit: r527836 - in /mina/trunk/core/src:
main/java/org/apache/mina/filter/reqres/ test/java/org/apache/mina/filter/
test/java/org/apache/mina/filter/reqres/
Author: trustin
Date: Thu Apr 12 01:40:51 2007
New Revision: 527836
URL: http://svn.apache.org/viewvc?view=rev&rev=527836
Log:
Resolved issue: DIRMINA-92 (Utility classes for asynchronous request-response protocols.)
* Added org.apache.mina.filter.reqres package
* Fixed a trivial bug in StreamWriteFilterTest
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java (with props)
mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/
mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java (with props)
Modified:
mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class Request {
+ private final Object id;
+ private final Object message;
+ private final long timeoutMillis;
+ private TimerTask timerTask;
+
+ public Request(Object id, Object message, long timeoutMillis) {
+ this(id, message, timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public Request(Object id, Object message, long timeout, TimeUnit unit) {
+ if (id == null) {
+ throw new NullPointerException("id");
+ }
+ if (message == null) {
+ throw new NullPointerException("message");
+ }
+ if (timeout < 0) {
+ throw new IllegalArgumentException(
+ "timeout: " + timeout + " (expected: 0+)");
+ } else if (timeout == 0) {
+ timeout = Long.MAX_VALUE;
+ }
+
+ if (unit == null) {
+ throw new NullPointerException("unit");
+ }
+
+ this.id = id;
+ this.message = message;
+ this.timeoutMillis = unit.toMillis(timeout);
+ }
+
+ public Object getId() {
+ return id;
+ }
+
+ public Object getMessage() {
+ return message;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (o == null) {
+ return false;
+ }
+
+ if (!(o instanceof Request)) {
+ return false;
+ }
+
+ Request that = (Request) o;
+ return this.getId().equals(that.getId());
+ }
+
+ @Override
+ public String toString() {
+ String timeout = (getTimeoutMillis() == Long.MAX_VALUE)?
+ "max" : String.valueOf(getTimeoutMillis());
+
+ return "request: { id=" + getId() +
+ ", timeout=" + timeout + ", message=" + getMessage() + " }";
+ }
+
+ TimerTask getTimerTask() {
+ return timerTask;
+ }
+
+ void setTimerTask(TimerTask timerTask) {
+ this.timerTask = timerTask;
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.util.SessionLog;
+
+/**
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class RequestResponseFilter extends IoFilterAdapter {
+
+ private static final String REQUEST_STORE = RequestResponseFilter.class.getName() + ".requestStore";
+ private static final String UNFINISHED_TASKS = RequestResponseFilter.class.getName() + ".unfinishedTasks";
+
+ private static int timerId = 0;
+
+ private final ResponseInspector responseInspector;
+ private final Timer timer = new Timer("RequestTimer-" + (timerId++), true);
+
+ public RequestResponseFilter(ResponseInspector responseInspector) {
+ if (responseInspector == null) {
+ throw new NullPointerException("responseInspector");
+ }
+ this.responseInspector = responseInspector;
+ }
+
+ @Override
+ public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+ IoSession session = parent.getSession();
+ session.setAttribute(REQUEST_STORE, new ConcurrentHashMap<Object, Request>());
+ session.setAttribute(UNFINISHED_TASKS, new LinkedHashSet<TimerTask>());
+ }
+
+ @Override
+ public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+ IoSession session = parent.getSession();
+ session.removeAttribute(UNFINISHED_TASKS);
+ session.removeAttribute(REQUEST_STORE);
+ }
+
+ /**
+ * Stops the timer thread this filter is using for processing request timeout.
+ */
+ @Override
+ public void destroy() {
+ timer.cancel();
+ }
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
+ Object requestId = responseInspector.getRequestId(message);
+ if (requestId == null) {
+ // Not a response message. Ignore.
+ nextFilter.messageReceived(session, message);
+ return;
+ }
+
+ // Retrieve (or remove) the corresponding request.
+ ResponseType type = responseInspector.getResponseType(message);
+ if (type == null) {
+ nextFilter.exceptionCaught(
+ session,
+ new IllegalStateException(
+ responseInspector.getClass().getName() +
+ "#getResponseType() may not return null."));
+ }
+
+ Map<Object, Request> requestStore = getRequestStore(session);
+
+ Request request;
+ switch (type) {
+ case WHOLE:
+ case PARTIAL_LAST:
+ request = requestStore.remove(requestId);
+ break;
+ case PARTIAL:
+ request = requestStore.get(requestId);
+ break;
+ default:
+ throw new InternalError();
+ }
+
+ if (request == null) {
+ // A response message without request. Swallow the event because
+ // the response might have arrived too late.
+ if (SessionLog.isDebugEnabled(session)) {
+ SessionLog.debug(
+ session,
+ "Unknown request ID '" + requestId +
+ "' for the response message. Timed out already?: " + message);
+ }
+ } else {
+ // Found a matching request.
+ // Cancel the timeout task if needed.
+ if (type != ResponseType.PARTIAL) {
+ TimerTask task = request.getTimerTask();
+ if (task != null) {
+ task.cancel();
+ }
+ }
+
+ // And forward the event.
+ nextFilter.messageReceived(session, new Response(request, message, type));
+ }
+ }
+
+ @Override
+ public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+ Object message = writeRequest.getMessage();
+ if (!(message instanceof Request)) {
+ nextFilter.filterWrite(session, writeRequest);
+ return;
+ }
+
+ Request request = (Request) message;
+ ConcurrentMap<Object, Request> requestStore = getRequestStore(session);
+ Object oldValue = requestStore.putIfAbsent(request.getId(), request);
+ if (oldValue != null) {
+ nextFilter.exceptionCaught(
+ session,
+ new IllegalStateException("Duplicate request ID: " + request.getId()));
+ }
+
+ nextFilter.filterWrite(session, new RequestWriteRequest(writeRequest));
+ }
+
+ @Override
+ public void messageSent(final NextFilter nextFilter, final IoSession session, WriteRequest writeRequest) throws Exception {
+ if (writeRequest instanceof RequestWriteRequest) {
+ // Schedule a task to be executed on timeout.
+ RequestWriteRequest wrappedRequest = (RequestWriteRequest) writeRequest;
+ WriteRequest actualRequest = wrappedRequest.getWriteRequest();
+ final Request request = (Request) actualRequest.getMessage();
+
+ // Find the timeout date avoiding overflow.
+ Date timeoutDate = new Date(System.currentTimeMillis());
+ if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate.getTime()) {
+ timeoutDate.setTime(Long.MAX_VALUE);
+ } else {
+ timeoutDate.setTime(timeoutDate.getTime() + request.getTimeoutMillis());
+ }
+
+ TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
+ request.setTimerTask(timeoutTask);
+
+ // Add the timtoue task to the unfinished task set.
+ Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+ synchronized (unfinishedTasks) {
+ unfinishedTasks.add(timeoutTask);
+ }
+
+ // Schedule the timeout task.
+ timer.schedule(timeoutTask, timeoutDate);
+
+ // and forward the original write request.
+ nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
+ } else {
+ nextFilter.messageSent(session, writeRequest);
+ }
+ }
+
+ @Override
+ public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+ // Copy the unifished task set to avoid unnecessary lock acquisition.
+ // Copying will be cheap because there won't be that many requests queued.
+ Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+ Collection<TimerTask> unfinishedTasksCopy;
+ synchronized (unfinishedTasks) {
+ unfinishedTasksCopy = new ArrayList<TimerTask>(unfinishedTasks);
+ unfinishedTasks.clear();
+ }
+
+ // Generate timeout artifically.
+ for (TimerTask task: unfinishedTasksCopy) {
+ if (task.cancel()) {
+ task.run();
+ }
+ }
+
+ // Clear the request store just in case we missed something, though it's unlikely.
+ getRequestStore(session).clear();
+
+ // Now tell the main subject.
+ nextFilter.sessionClosed(session);
+ }
+
+ @SuppressWarnings("unchecked")
+ private ConcurrentMap<Object, Request> getRequestStore(IoSession session) {
+ return (ConcurrentMap<Object, Request>) session.getAttribute(REQUEST_STORE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<TimerTask> getUnfinishedTasks(IoSession session) {
+ return (Set<TimerTask>) session.getAttribute(UNFINISHED_TASKS);
+ }
+
+ private class TimeoutTask extends TimerTask {
+ private final NextFilter filter;
+ private final Request request;
+ private final IoSession session;
+
+ private TimeoutTask(NextFilter filter, Request request, IoSession session) {
+ this.filter = filter;
+ this.request = request;
+ this.session = session;
+ }
+
+ @Override
+ public void run() {
+ Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+ if (unfinishedTasks != null) {
+ synchronized (unfinishedTasks) {
+ unfinishedTasks.remove(this);
+ }
+ }
+
+ ConcurrentMap<Object, Request> requestStore = getRequestStore(session);
+ if (requestStore.remove(request.getId(), request)) {
+ // Throw the exception only when it's really timed out.
+ filter.exceptionCaught(session, new RequestTimeoutException(request));
+ }
+ }
+ }
+
+ private static class RequestWriteRequest extends WriteRequestWrapper {
+ public RequestWriteRequest(WriteRequest writeRequest) {
+ super(writeRequest);
+ }
+
+ public Object getMessage() {
+ return ((Request) super.getMessage()).getMessage();
+ }
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+import java.io.IOException;
+
+/**
+ * An {@link IOException} which is thrown when a {@link Request} is timed out.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class RequestTimeoutException extends IOException {
+ private static final long serialVersionUID = 5546784978950631652L;
+
+ private final Request request;
+
+ /**
+ * Creates a new exception.
+ */
+ public RequestTimeoutException(Request request) {
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+ this.request = request;
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public RequestTimeoutException(Request request, String s) {
+ super(s);
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+ this.request = request;
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public RequestTimeoutException(Request request, String message, Throwable cause) {
+ super(message, cause);
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+ this.request = request;
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public RequestTimeoutException(Request request, Throwable cause) {
+ super(cause);
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+ this.request = request;
+ }
+
+ /**
+ * Returns the request which has timed out.
+ */
+ public Request getRequest() {
+ return request;
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class Response {
+ private final Request request;
+ private final ResponseType type;
+ private final Object message;
+
+ public Response(Request request, Object message, ResponseType type) {
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+
+ if (message == null) {
+ throw new NullPointerException("message");
+ }
+
+ if (type == null) {
+ throw new NullPointerException("type");
+ }
+
+ this.request = request;
+ this.type = type;
+ this.message = message;
+ }
+
+ public Request getRequest() {
+ return request;
+ }
+
+ public ResponseType getType() {
+ return type;
+ }
+
+ public Object getMessage() {
+ return message;
+ }
+
+ @Override
+ public int hashCode() {
+ return getRequest().getId().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (o == null) {
+ return false;
+ }
+
+ if (!(o instanceof Response)) {
+ return false;
+ }
+
+ Response that = (Response) o;
+ if (!this.getRequest().equals(that.getRequest())) {
+ return false;
+ }
+
+ return this.getType().equals(that.getType());
+ }
+
+ @Override
+ public String toString() {
+ return "response: { requestId=" + getRequest().getId() +
+ ", type=" + getType() + ", message=" + getMessage() + " }";
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ResponseInspector {
+ Object getRequestId(Object message);
+ ResponseType getResponseType(Object message);
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public enum ResponseType {
+ WHOLE, PARTIAL, PARTIAL_LAST;
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java?view=diff&rev=527836&r1=527835&r2=527836
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java Thu Apr 12 01:40:51 2007
@@ -546,7 +546,7 @@
@Override
protected boolean argumentMatches( Object expected, Object actual )
{
- if( expected instanceof WriteRequest && expected instanceof WriteRequest )
+ if( expected instanceof WriteRequest && actual instanceof WriteRequest )
{
WriteRequest w1 = ( WriteRequest ) expected;
WriteRequest w2 = ( WriteRequest ) actual;
Added: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java (added)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.reqres;
+
+import java.net.SocketAddress;
+
+import org.apache.mina.common.DefaultWriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.util.SessionLog;
+import org.easymock.AbstractMatcher;
+import org.easymock.MockControl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests {@link RequestResponseFilter}.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class RequestResponseFilterTest {
+
+ private RequestResponseFilter filter;
+ private IoSession session;
+
+ private IoFilterChain chain;
+ private NextFilter nextFilter;
+ private MockControl nextFilterControl;
+
+ private final WriteRequestMatcher matcher = new WriteRequestMatcher();
+
+ @Before
+ public void setUp() throws Exception {
+ session = new DummySession();
+ filter = new RequestResponseFilter(new MessageInspector());
+
+ // Set up mock objects.
+ chain = new AbstractIoFilterChain(session) {
+ @Override
+ protected void doClose(IoSession session) throws Exception {
+ }
+
+ @Override
+ protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception {
+ }
+ };
+ nextFilterControl = MockControl.createControl(NextFilter.class);
+ nextFilter = (NextFilter) nextFilterControl.getMock();
+
+ // Initialize the filter.
+ filter.onPreAdd(chain, "reqres", nextFilter);
+ filter.onPostAdd(chain, "reqres", nextFilter);
+ Assert.assertFalse(session.getAttributeKeys().isEmpty());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // Destroy the filter.
+ filter.onPreRemove(chain, "reqres", nextFilter);
+ filter.onPostRemove(chain, "reqres", nextFilter);
+ session.removeAttribute(SessionLog.LOGGER);
+ Assert.assertTrue(session.getAttributeKeys().isEmpty());
+
+ filter.destroy();
+ filter = null;
+ }
+
+ @Test
+ public void testWholeResponse() throws Exception {
+ Request req = new Request(1, new Object(), Long.MAX_VALUE);
+ Response res = new Response(req, new Message(1, ResponseType.WHOLE), ResponseType.WHOLE);
+ WriteRequest rwr = new DefaultWriteRequest(req);
+
+ // Record
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+ nextFilterControl.setMatcher(matcher);
+ nextFilter.messageSent(session, rwr);
+ nextFilter.messageReceived(session, res);
+
+ // Replay
+ nextFilterControl.replay();
+ filter.filterWrite(nextFilter, session, rwr);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ filter.messageReceived(nextFilter, session, res.getMessage());
+ filter.messageReceived(nextFilter, session, res.getMessage()); // Ignored
+
+ // Verify
+ nextFilterControl.verify();
+ }
+
+ @Test
+ public void testPartialResponse() throws Exception {
+ Request req = new Request(1, new Object(), Long.MAX_VALUE);
+ Response res1 = new Response(req, new Message(1, ResponseType.PARTIAL), ResponseType.PARTIAL);
+ Response res2 = new Response(req, new Message(1, ResponseType.PARTIAL_LAST), ResponseType.PARTIAL_LAST);
+ WriteRequest rwr = new DefaultWriteRequest(req);
+
+ // Record
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+ nextFilterControl.setMatcher(matcher);
+ nextFilter.messageSent(session, rwr);
+ nextFilter.messageReceived(session, res1);
+ nextFilter.messageReceived(session, res2);
+
+ // Replay
+ nextFilterControl.replay();
+ filter.filterWrite(nextFilter, session, rwr);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ filter.messageReceived(nextFilter, session, res1.getMessage());
+ filter.messageReceived(nextFilter, session, res2.getMessage());
+ filter.messageReceived(nextFilter, session, res1.getMessage()); // Ignored
+ filter.messageReceived(nextFilter, session, res2.getMessage()); // Ignored
+
+ // Verify
+ nextFilterControl.verify();
+ }
+
+ @Test
+ public void testWholeResponseTimeout() throws Exception {
+ Request req = new Request(1, new Object(), 10); // 10ms timeout
+ Response res = new Response(req, new Message(1, ResponseType.WHOLE), ResponseType.WHOLE);
+ WriteRequest rwr = new DefaultWriteRequest(req);
+
+ // Record
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+ nextFilterControl.setMatcher(matcher);
+ nextFilter.messageSent(session, rwr);
+ nextFilter.exceptionCaught(session, new RequestTimeoutException(req));
+ nextFilterControl.setMatcher(new ExceptionMatcher());
+
+ // Replay
+ nextFilterControl.replay();
+ filter.filterWrite(nextFilter, session, rwr);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ Thread.sleep(300); // Wait until the request times out.
+ filter.messageReceived(nextFilter, session, res.getMessage()); // Ignored
+
+ // Verify
+ nextFilterControl.verify();
+ }
+
+ @Test
+ public void testPartialResponseTimeout() throws Exception {
+ Request req = new Request(1, new Object(), 10); // 10ms timeout
+ Response res1 = new Response(req, new Message(1, ResponseType.PARTIAL), ResponseType.PARTIAL);
+ Response res2 = new Response(req, new Message(1, ResponseType.PARTIAL_LAST), ResponseType.PARTIAL_LAST);
+ WriteRequest rwr = new DefaultWriteRequest(req);
+
+ // Record
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+ nextFilterControl.setMatcher(matcher);
+ nextFilter.messageSent(session, rwr);
+ nextFilter.messageReceived(session, res1);
+ nextFilter.exceptionCaught(session, new RequestTimeoutException(req));
+ nextFilterControl.setMatcher(new ExceptionMatcher());
+
+ // Replay
+ nextFilterControl.replay();
+ filter.filterWrite(nextFilter, session, rwr);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ filter.messageReceived(nextFilter, session, res1.getMessage());
+ Thread.sleep(300); // Wait until the request times out.
+ filter.messageReceived(nextFilter, session, res2.getMessage()); // Ignored
+ filter.messageReceived(nextFilter, session, res1.getMessage()); // Ignored
+
+ // Verify
+ nextFilterControl.verify();
+ }
+
+ @Test
+ public void testTimeoutByDisconnection() throws Exception {
+ // We run a test case that doesn't raise a timeout to make sure
+ // the timeout is not raised again by disconnection.
+ testWholeResponse();
+ nextFilterControl.reset();
+
+ Request req1 = new Request(1, new Object(), Long.MAX_VALUE);
+ Request req2 = new Request(2, new Object(), Long.MAX_VALUE);
+ WriteRequest rwr1 = new DefaultWriteRequest(req1);
+ WriteRequest rwr2 = new DefaultWriteRequest(req2);
+
+ // Record
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req1.getMessage()));
+ nextFilterControl.setMatcher(matcher);
+ nextFilter.messageSent(session, rwr1);
+ nextFilter.filterWrite(session, new DefaultWriteRequest(req2.getMessage()));
+ nextFilter.messageSent(session, rwr2);
+ nextFilter.exceptionCaught(session, new RequestTimeoutException(req1));
+ nextFilterControl.setMatcher(new ExceptionMatcher());
+ nextFilter.exceptionCaught(session, new RequestTimeoutException(req2));
+ nextFilter.sessionClosed(session);
+
+ // Replay
+ nextFilterControl.replay();
+ filter.filterWrite(nextFilter, session, rwr1);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ filter.filterWrite(nextFilter, session, rwr2);
+ filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+ filter.sessionClosed(nextFilter, session);
+
+ // Verify
+ nextFilterControl.verify();
+ }
+
+ private static class Message {
+ private final int id;
+ private final ResponseType type;
+
+ private Message(int id, ResponseType type) {
+ this.id = id;
+ this.type = type;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public ResponseType getType() {
+ return type;
+ }
+ }
+
+ private static class MessageInspector implements ResponseInspector {
+
+ public Object getRequestId(Object message) {
+ if (!(message instanceof Message)) {
+ return null;
+ }
+
+ return ((Message) message).getId();
+ }
+
+ public ResponseType getResponseType(Object message) {
+ if (!(message instanceof Message)) {
+ return null;
+ }
+
+ return ((Message) message).getType();
+ }
+ }
+
+ private static class DummySession extends BaseIoSession {
+
+ @Override
+ protected void updateTrafficMask() {
+ }
+
+ public IoSessionConfig getConfig() {
+ return null;
+ }
+
+ public IoFilterChain getFilterChain() {
+ return null;
+ }
+
+ public IoHandler getHandler() {
+ return new IoHandlerAdapter();
+ }
+
+ public SocketAddress getLocalAddress() {
+ return null;
+ }
+
+ public SocketAddress getRemoteAddress() {
+ return null;
+ }
+
+ public int getScheduledWriteBytes() {
+ return 0;
+ }
+
+ public int getScheduledWriteMessages() {
+ return 0;
+ }
+
+ public IoService getService() {
+ return null;
+ }
+
+ public TransportType getTransportType() {
+ return null;
+ }
+ }
+
+ private static class WriteRequestMatcher extends AbstractMatcher
+ {
+ private WriteRequest lastWriteRequest;
+
+ public WriteRequest getLastWriteRequest() {
+ return lastWriteRequest;
+ }
+
+ @Override
+ protected boolean argumentMatches( Object expected, Object actual )
+ {
+ if( actual instanceof WriteRequest && expected instanceof WriteRequest )
+ {
+ boolean answer = ((WriteRequest) expected).getMessage().equals(((WriteRequest) actual).getMessage());
+ lastWriteRequest = (WriteRequest) actual;
+ return answer;
+ }
+ return super.argumentMatches( expected, actual );
+ }
+ }
+
+ private static class ExceptionMatcher extends AbstractMatcher
+ {
+ @Override
+ protected boolean argumentMatches( Object expected, Object actual )
+ {
+ if( actual instanceof RequestTimeoutException && expected instanceof RequestTimeoutException )
+ {
+ return ((RequestTimeoutException) expected).getRequest().equals(
+ ((RequestTimeoutException) actual).getRequest());
+ }
+ return super.argumentMatches( expected, actual );
+ }
+ }
+}
Propchange: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date