You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/04/12 10:40:52 UTC

svn commit: r527836 - in /mina/trunk/core/src: main/java/org/apache/mina/filter/reqres/ test/java/org/apache/mina/filter/ test/java/org/apache/mina/filter/reqres/

Author: trustin
Date: Thu Apr 12 01:40:51 2007
New Revision: 527836

URL: http://svn.apache.org/viewvc?view=rev&rev=527836
Log:
Resolved issue: DIRMINA-92 (Utility classes for asynchronous request-response protocols.)
* Added org.apache.mina.filter.reqres package
* Fixed a trivial bug in StreamWriteFilterTest

Added:
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java   (with props)
    mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/
    mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java   (with props)
Modified:
    mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class Request {
+    private final Object id;
+    private final Object message;
+    private final long timeoutMillis;
+    private TimerTask timerTask;
+    
+    public Request(Object id, Object message, long timeoutMillis) {
+        this(id, message, timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+    
+    public Request(Object id, Object message, long timeout, TimeUnit unit) {
+        if (id == null) {
+            throw new NullPointerException("id");
+        }
+        if (message == null) {
+            throw new NullPointerException("message");
+        }
+        if (timeout < 0) {
+            throw new IllegalArgumentException(
+                    "timeout: " + timeout + " (expected: 0+)");
+        } else if (timeout == 0) {
+            timeout = Long.MAX_VALUE;
+        }
+
+        if (unit == null) {
+            throw new NullPointerException("unit");
+        }
+        
+        this.id = id;
+        this.message = message;
+        this.timeoutMillis = unit.toMillis(timeout);
+    }
+    
+    public Object getId() {
+        return id;
+    }
+
+    public Object getMessage() {
+        return message;
+    }
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+    
+    @Override
+    public int hashCode() {
+        return getId().hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        
+        if (o == null) {
+            return false;
+        }
+        
+        if (!(o instanceof Request)) {
+            return false;
+        }
+        
+        Request that = (Request) o;
+        return this.getId().equals(that.getId());
+    }
+
+    @Override
+    public String toString() {
+        String timeout = (getTimeoutMillis() == Long.MAX_VALUE)?
+                "max" : String.valueOf(getTimeoutMillis());
+
+        return "request: { id=" + getId() +
+               ", timeout=" + timeout + ", message=" + getMessage() + " }";
+    }
+    
+    TimerTask getTimerTask() {
+        return timerTask;
+    }
+    
+    void setTimerTask(TimerTask timerTask) {
+        this.timerTask = timerTask;
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Request.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,269 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.util.SessionLog;
+
+/**
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class RequestResponseFilter extends IoFilterAdapter {
+
+    private static final String REQUEST_STORE = RequestResponseFilter.class.getName() + ".requestStore";
+    private static final String UNFINISHED_TASKS = RequestResponseFilter.class.getName() + ".unfinishedTasks";
+
+    private static int timerId = 0;
+    
+    private final ResponseInspector responseInspector;
+    private final Timer timer = new Timer("RequestTimer-" + (timerId++), true);
+    
+    public RequestResponseFilter(ResponseInspector responseInspector) {
+        if (responseInspector == null) {
+            throw new NullPointerException("responseInspector");
+        }
+        this.responseInspector = responseInspector;
+    }
+    
+    @Override
+    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+        IoSession session = parent.getSession();
+        session.setAttribute(REQUEST_STORE, new ConcurrentHashMap<Object, Request>());
+        session.setAttribute(UNFINISHED_TASKS, new LinkedHashSet<TimerTask>());
+    }
+    
+    @Override
+    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
+        IoSession session = parent.getSession();
+        session.removeAttribute(UNFINISHED_TASKS);
+        session.removeAttribute(REQUEST_STORE);
+    }
+    
+    /**
+     * Stops the timer thread this filter is using for processing request timeout.
+     */
+    @Override
+    public void destroy() {
+        timer.cancel();
+    }
+
+    @Override
+    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
+        Object requestId = responseInspector.getRequestId(message);
+        if (requestId == null) {
+            // Not a response message.  Ignore.
+            nextFilter.messageReceived(session, message);
+            return;
+        }
+        
+        // Retrieve (or remove) the corresponding request.
+        ResponseType type = responseInspector.getResponseType(message);
+        if (type == null) {
+            nextFilter.exceptionCaught(
+                    session,
+                    new IllegalStateException(
+                            responseInspector.getClass().getName() +
+                            "#getResponseType() may not return null."));
+        }
+        
+        Map<Object, Request> requestStore = getRequestStore(session);
+        
+        Request request;
+        switch (type) {
+        case WHOLE:
+        case PARTIAL_LAST:
+            request = requestStore.remove(requestId);
+            break;
+        case PARTIAL:
+            request = requestStore.get(requestId);
+            break;
+        default:
+            throw new InternalError();
+        }
+
+        if (request == null) {
+            // A response message without request. Swallow the event because
+            // the response might have arrived too late.
+            if (SessionLog.isDebugEnabled(session)) {
+                SessionLog.debug(
+                        session,
+                        "Unknown request ID '" + requestId + 
+                        "' for the response message. Timed out already?: " + message);
+            }
+        } else {
+            // Found a matching request.
+            // Cancel the timeout task if needed.
+            if (type != ResponseType.PARTIAL) {
+                TimerTask task = request.getTimerTask();
+                if (task != null) {
+                    task.cancel();
+                }
+            }
+
+            // And forward the event.
+            nextFilter.messageReceived(session, new Response(request, message, type));
+        }
+    }
+
+    @Override
+    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+        Object message = writeRequest.getMessage();
+        if (!(message instanceof Request)) {
+            nextFilter.filterWrite(session, writeRequest);
+            return;
+        }
+        
+        Request request = (Request) message;
+        ConcurrentMap<Object, Request> requestStore = getRequestStore(session);
+        Object oldValue = requestStore.putIfAbsent(request.getId(), request);
+        if (oldValue != null) {
+            nextFilter.exceptionCaught(
+                    session,
+                    new IllegalStateException("Duplicate request ID: " + request.getId()));
+        }
+        
+        nextFilter.filterWrite(session, new RequestWriteRequest(writeRequest));
+    }
+
+    @Override
+    public void messageSent(final NextFilter nextFilter, final IoSession session, WriteRequest writeRequest) throws Exception {
+        if (writeRequest instanceof RequestWriteRequest) {
+            // Schedule a task to be executed on timeout.
+            RequestWriteRequest wrappedRequest = (RequestWriteRequest) writeRequest;
+            WriteRequest actualRequest = wrappedRequest.getWriteRequest();
+            final Request request = (Request) actualRequest.getMessage();
+            
+            // Find the timeout date avoiding overflow.
+            Date timeoutDate = new Date(System.currentTimeMillis());
+            if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate.getTime()) {
+                timeoutDate.setTime(Long.MAX_VALUE);
+            } else {
+                timeoutDate.setTime(timeoutDate.getTime() + request.getTimeoutMillis());
+            }
+            
+            TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
+            request.setTimerTask(timeoutTask);
+            
+            // Add the timtoue task to the unfinished task set.
+            Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+            synchronized (unfinishedTasks) {
+                unfinishedTasks.add(timeoutTask);
+            }
+            
+            // Schedule the timeout task.
+            timer.schedule(timeoutTask, timeoutDate);
+            
+            // and forward the original write request.
+            nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
+        } else {
+            nextFilter.messageSent(session, writeRequest);
+        }
+    }
+
+    @Override
+    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+        // Copy the unifished task set to avoid unnecessary lock acquisition.
+        // Copying will be cheap because there won't be that many requests queued.
+        Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+        Collection<TimerTask> unfinishedTasksCopy;
+        synchronized (unfinishedTasks) {
+            unfinishedTasksCopy = new ArrayList<TimerTask>(unfinishedTasks);
+            unfinishedTasks.clear();
+        }
+        
+        // Generate timeout artifically.
+        for (TimerTask task: unfinishedTasksCopy) {
+            if (task.cancel()) {
+                task.run();
+            }
+        }
+
+        // Clear the request store just in case we missed something, though it's unlikely.
+        getRequestStore(session).clear();
+        
+        // Now tell the main subject.
+        nextFilter.sessionClosed(session);
+    }
+
+    @SuppressWarnings("unchecked")
+    private ConcurrentMap<Object, Request> getRequestStore(IoSession session) {
+        return (ConcurrentMap<Object, Request>) session.getAttribute(REQUEST_STORE);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Set<TimerTask> getUnfinishedTasks(IoSession session) {
+        return (Set<TimerTask>) session.getAttribute(UNFINISHED_TASKS);
+    }
+    
+    private class TimeoutTask extends TimerTask {
+        private final NextFilter filter;
+        private final Request request;
+        private final IoSession session;
+
+        private TimeoutTask(NextFilter filter, Request request, IoSession session) {
+            this.filter = filter;
+            this.request = request;
+            this.session = session;
+        }
+
+        @Override
+        public void run() {
+            Set<TimerTask> unfinishedTasks = getUnfinishedTasks(session);
+            if (unfinishedTasks != null) {
+                synchronized (unfinishedTasks) {
+                    unfinishedTasks.remove(this);
+                }
+            }
+        
+            ConcurrentMap<Object, Request> requestStore = getRequestStore(session);
+            if (requestStore.remove(request.getId(), request)) {
+                // Throw the exception only when it's really timed out.
+                filter.exceptionCaught(session, new RequestTimeoutException(request));
+            }
+        }
+    }
+
+    private static class RequestWriteRequest extends WriteRequestWrapper {
+        public RequestWriteRequest(WriteRequest writeRequest) {
+            super(writeRequest);
+        }
+        
+        public Object getMessage() {
+            return ((Request) super.getMessage()).getMessage();
+        }
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+import java.io.IOException;
+
+/**
+ * An {@link IOException} which is thrown when a {@link Request} is timed out.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$, 
+ */
+public class RequestTimeoutException extends IOException {
+    private static final long serialVersionUID = 5546784978950631652L;
+
+    private final Request request;
+    
+    /**
+     * Creates a new exception.
+     */
+    public RequestTimeoutException(Request request) {
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        this.request = request;
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public RequestTimeoutException(Request request, String s) {
+        super(s);
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        this.request = request;
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public RequestTimeoutException(Request request, String message, Throwable cause) {
+        super(message, cause);
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        this.request = request;
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public RequestTimeoutException(Request request, Throwable cause) {
+        super(cause);
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        this.request = request;
+    }
+    
+    /**
+     * Returns the request which has timed out.
+     */
+    public Request getRequest() {
+        return request;
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestTimeoutException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class Response {
+    private final Request request;
+    private final ResponseType type;
+    private final Object message;
+
+    public Response(Request request, Object message, ResponseType type) {
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        
+        if (message == null) {
+            throw new NullPointerException("message");
+        }
+
+        if (type == null) {
+            throw new NullPointerException("type");
+        }
+        
+        this.request = request;
+        this.type = type;
+        this.message = message;
+    }
+    
+    public Request getRequest() {
+        return request;
+    }
+    
+    public ResponseType getType() {
+        return type;
+    }
+    
+    public Object getMessage() {
+        return message;
+    }
+
+    @Override
+    public int hashCode() {
+        return getRequest().getId().hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        
+        if (o == null) {
+            return false;
+        }
+        
+        if (!(o instanceof Response)) {
+            return false;
+        }
+        
+        Response that = (Response) o;
+        if (!this.getRequest().equals(that.getRequest())) {
+            return false;
+        }
+        
+        return this.getType().equals(that.getType());
+    }
+    
+    @Override
+    public String toString() {
+        return "response: { requestId=" + getRequest().getId() +
+               ", type=" + getType() + ", message=" + getMessage() + " }";
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/Response.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ResponseInspector {
+    Object getRequestId(Object message);
+    ResponseType getResponseType(Object message);
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseInspector.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+/**
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public enum ResponseType {
+    WHOLE, PARTIAL, PARTIAL_LAST;
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/ResponseType.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java?view=diff&rev=527836&r1=527835&r2=527836
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/StreamWriteFilterTest.java Thu Apr 12 01:40:51 2007
@@ -546,7 +546,7 @@
         @Override
         protected boolean argumentMatches( Object expected, Object actual )
         {
-            if( expected instanceof WriteRequest && expected instanceof WriteRequest )
+            if( expected instanceof WriteRequest && actual instanceof WriteRequest )
             {
                 WriteRequest w1 = ( WriteRequest ) expected;
                 WriteRequest w2 = ( WriteRequest ) actual;

Added: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java?view=auto&rev=527836
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java (added)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java Thu Apr 12 01:40:51 2007
@@ -0,0 +1,348 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.filter.reqres;
+
+import java.net.SocketAddress;
+
+import org.apache.mina.common.DefaultWriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.IoFilter.NextFilter;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.util.SessionLog;
+import org.easymock.AbstractMatcher;
+import org.easymock.MockControl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests {@link RequestResponseFilter}.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class RequestResponseFilterTest {
+    
+    private RequestResponseFilter filter;
+    private IoSession session;
+    
+    private IoFilterChain chain;
+    private NextFilter nextFilter;
+    private MockControl nextFilterControl;
+    
+    private final WriteRequestMatcher matcher = new WriteRequestMatcher();
+    
+    @Before
+    public void setUp() throws Exception {
+        session = new DummySession();
+        filter = new RequestResponseFilter(new MessageInspector());
+        
+        // Set up mock objects.
+        chain = new AbstractIoFilterChain(session) {
+            @Override
+            protected void doClose(IoSession session) throws Exception {
+            }
+
+            @Override
+            protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception {
+            }
+        };
+        nextFilterControl = MockControl.createControl(NextFilter.class);
+        nextFilter = (NextFilter) nextFilterControl.getMock();
+        
+        // Initialize the filter.
+        filter.onPreAdd(chain, "reqres", nextFilter);
+        filter.onPostAdd(chain, "reqres", nextFilter);
+        Assert.assertFalse(session.getAttributeKeys().isEmpty());
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        // Destroy the filter.
+        filter.onPreRemove(chain, "reqres", nextFilter);
+        filter.onPostRemove(chain, "reqres", nextFilter);
+        session.removeAttribute(SessionLog.LOGGER);
+        Assert.assertTrue(session.getAttributeKeys().isEmpty());
+
+        filter.destroy();
+        filter = null;
+    }
+    
+    @Test
+    public void testWholeResponse() throws Exception {
+        Request req = new Request(1, new Object(), Long.MAX_VALUE);
+        Response res = new Response(req, new Message(1, ResponseType.WHOLE), ResponseType.WHOLE);
+        WriteRequest rwr = new DefaultWriteRequest(req);
+        
+        // Record
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+        nextFilterControl.setMatcher(matcher);
+        nextFilter.messageSent(session, rwr);
+        nextFilter.messageReceived(session, res);
+        
+        // Replay
+        nextFilterControl.replay();
+        filter.filterWrite(nextFilter, session, rwr);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        filter.messageReceived(nextFilter, session, res.getMessage());
+        filter.messageReceived(nextFilter, session, res.getMessage()); // Ignored
+        
+        // Verify
+        nextFilterControl.verify();
+    }
+    
+    @Test
+    public void testPartialResponse() throws Exception {
+        Request req = new Request(1, new Object(), Long.MAX_VALUE);
+        Response res1 = new Response(req, new Message(1, ResponseType.PARTIAL), ResponseType.PARTIAL);
+        Response res2 = new Response(req, new Message(1, ResponseType.PARTIAL_LAST), ResponseType.PARTIAL_LAST);
+        WriteRequest rwr = new DefaultWriteRequest(req);
+        
+        // Record
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+        nextFilterControl.setMatcher(matcher);
+        nextFilter.messageSent(session, rwr);
+        nextFilter.messageReceived(session, res1);
+        nextFilter.messageReceived(session, res2);
+        
+        // Replay
+        nextFilterControl.replay();
+        filter.filterWrite(nextFilter, session, rwr);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        filter.messageReceived(nextFilter, session, res1.getMessage());
+        filter.messageReceived(nextFilter, session, res2.getMessage());
+        filter.messageReceived(nextFilter, session, res1.getMessage()); // Ignored
+        filter.messageReceived(nextFilter, session, res2.getMessage()); // Ignored
+        
+        // Verify
+        nextFilterControl.verify();
+    }
+    
+    @Test
+    public void testWholeResponseTimeout() throws Exception {
+        Request req = new Request(1, new Object(), 10);  // 10ms timeout
+        Response res = new Response(req, new Message(1, ResponseType.WHOLE), ResponseType.WHOLE);
+        WriteRequest rwr = new DefaultWriteRequest(req);
+        
+        // Record
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+        nextFilterControl.setMatcher(matcher);
+        nextFilter.messageSent(session, rwr);
+        nextFilter.exceptionCaught(session, new RequestTimeoutException(req));
+        nextFilterControl.setMatcher(new ExceptionMatcher());
+        
+        // Replay
+        nextFilterControl.replay();
+        filter.filterWrite(nextFilter, session, rwr);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        Thread.sleep(300);  // Wait until the request times out.
+        filter.messageReceived(nextFilter, session, res.getMessage()); // Ignored
+        
+        // Verify
+        nextFilterControl.verify();
+    }
+    
+    @Test
+    public void testPartialResponseTimeout() throws Exception {
+        Request req = new Request(1, new Object(), 10);  // 10ms timeout
+        Response res1 = new Response(req, new Message(1, ResponseType.PARTIAL), ResponseType.PARTIAL);
+        Response res2 = new Response(req, new Message(1, ResponseType.PARTIAL_LAST), ResponseType.PARTIAL_LAST);
+        WriteRequest rwr = new DefaultWriteRequest(req);
+        
+        // Record
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req.getMessage()));
+        nextFilterControl.setMatcher(matcher);
+        nextFilter.messageSent(session, rwr);
+        nextFilter.messageReceived(session, res1);
+        nextFilter.exceptionCaught(session, new RequestTimeoutException(req));
+        nextFilterControl.setMatcher(new ExceptionMatcher());
+        
+        // Replay
+        nextFilterControl.replay();
+        filter.filterWrite(nextFilter, session, rwr);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        filter.messageReceived(nextFilter, session, res1.getMessage());
+        Thread.sleep(300);  // Wait until the request times out.
+        filter.messageReceived(nextFilter, session, res2.getMessage()); // Ignored
+        filter.messageReceived(nextFilter, session, res1.getMessage()); // Ignored
+        
+        // Verify
+        nextFilterControl.verify();
+    }
+    
+    @Test
+    public void testTimeoutByDisconnection() throws Exception {
+        // We run a test case that doesn't raise a timeout to make sure
+        // the timeout is not raised again by disconnection.
+        testWholeResponse();
+        nextFilterControl.reset();
+        
+        Request req1 = new Request(1, new Object(), Long.MAX_VALUE);
+        Request req2 = new Request(2, new Object(), Long.MAX_VALUE);
+        WriteRequest rwr1 = new DefaultWriteRequest(req1);
+        WriteRequest rwr2 = new DefaultWriteRequest(req2);
+        
+        // Record
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req1.getMessage()));
+        nextFilterControl.setMatcher(matcher);
+        nextFilter.messageSent(session, rwr1);
+        nextFilter.filterWrite(session, new DefaultWriteRequest(req2.getMessage()));
+        nextFilter.messageSent(session, rwr2);
+        nextFilter.exceptionCaught(session, new RequestTimeoutException(req1));
+        nextFilterControl.setMatcher(new ExceptionMatcher());
+        nextFilter.exceptionCaught(session, new RequestTimeoutException(req2));
+        nextFilter.sessionClosed(session);
+        
+        // Replay
+        nextFilterControl.replay();
+        filter.filterWrite(nextFilter, session, rwr1);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        filter.filterWrite(nextFilter, session, rwr2);
+        filter.messageSent(nextFilter, session, matcher.getLastWriteRequest());
+        filter.sessionClosed(nextFilter, session);
+        
+        // Verify
+        nextFilterControl.verify();
+    }
+    
+    private static class Message {
+        private final int id;
+        private final ResponseType type;
+        
+        private Message(int id, ResponseType type) {
+            this.id = id;
+            this.type = type;
+        }
+        
+        public int getId() {
+            return id;
+        }
+        
+        public ResponseType getType() {
+            return type;
+        }
+    }
+    
+    private static class MessageInspector implements ResponseInspector {
+
+        public Object getRequestId(Object message) {
+            if (!(message instanceof Message)) {
+                return null;
+            }
+            
+            return ((Message) message).getId();
+        }
+
+        public ResponseType getResponseType(Object message) {
+            if (!(message instanceof Message)) {
+                return null;
+            }
+            
+            return ((Message) message).getType();
+        }
+    }
+    
+    private static class DummySession extends BaseIoSession {
+
+        @Override
+        protected void updateTrafficMask() {
+        }
+
+        public IoSessionConfig getConfig() {
+            return null;
+        }
+
+        public IoFilterChain getFilterChain() {
+            return null;
+        }
+
+        public IoHandler getHandler() {
+            return new IoHandlerAdapter();
+        }
+
+        public SocketAddress getLocalAddress() {
+            return null;
+        }
+
+        public SocketAddress getRemoteAddress() {
+            return null;
+        }
+
+        public int getScheduledWriteBytes() {
+            return 0;
+        }
+
+        public int getScheduledWriteMessages() {
+            return 0;
+        }
+
+        public IoService getService() {
+            return null;
+        }
+
+        public TransportType getTransportType() {
+            return null;
+        }
+    }
+    
+    private static class WriteRequestMatcher extends AbstractMatcher
+    {
+        private WriteRequest lastWriteRequest;
+        
+        public WriteRequest getLastWriteRequest() {
+            return lastWriteRequest;
+        }
+
+        @Override
+        protected boolean argumentMatches( Object expected, Object actual )
+        {
+            if( actual instanceof WriteRequest && expected instanceof WriteRequest )
+            {
+                boolean answer = ((WriteRequest) expected).getMessage().equals(((WriteRequest) actual).getMessage());
+                lastWriteRequest = (WriteRequest) actual;
+                return answer;
+            }
+            return super.argumentMatches( expected, actual );
+        }
+    }
+    
+    private static class ExceptionMatcher extends AbstractMatcher
+    {
+        @Override
+        protected boolean argumentMatches( Object expected, Object actual )
+        {
+            if( actual instanceof RequestTimeoutException && expected instanceof RequestTimeoutException )
+            {
+                return ((RequestTimeoutException) expected).getRequest().equals(
+                        ((RequestTimeoutException) actual).getRequest());
+            }
+            return super.argumentMatches( expected, actual );
+        }
+    }
+}

Propchange: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/test/java/org/apache/mina/filter/reqres/RequestResponseFilterTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date