You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/02 12:57:02 UTC

svn commit: r591310 - in /mina/trunk: core/src/main/java/org/apache/mina/common/ core/src/main/java/org/apache/mina/filter/codec/ core/src/main/java/org/apache/mina/filter/ssl/ core/src/main/java/org/apache/mina/filter/stream/ core/src/main/java/org/ap...

Author: trustin
Date: Fri Nov  2 04:57:00 2007
New Revision: 591310

URL: http://svn.apache.org/viewvc?rev=591310&view=rev
Log:
Resolved issue: DIRMINA-249 (exceptionCaught() should provide more information)
* Added WriteException which contains the list of the failed WriteRequests
* All write-related exceptions extend WriteException now.
* Added NothingWrittenException
* Added WriteToClosedSessionException
* WriteFuture also provides 'exception' property to tell the cause of the write failure; all failed WriteFutures must provide a related exception now.
* All queued write requests which couldn't be written out due to sudden disconnection are notified as an exceptionCaught event with WriteToClosedSessionException now.
* Improved SslFilter to filter out WriteToClosedSessionException which contains a failed close_notify request.

Resolved issue: DIRMINA-302 (Unbounded nature of writeRequestQueue can cause OutOfMemoryException)
* Added WriteThrottleFilter
* Added WriteThrottlePolicy
* Added TooManyScheduledWriteException (extends WriteException)


Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java
    mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java
    mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
    mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
    mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java
    mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
    mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Fri Nov  2 04:57:00 2007
@@ -21,7 +21,9 @@
 
 import java.io.IOException;
 import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
@@ -383,6 +385,8 @@
     private void clearWriteRequestQueue(AbstractIoSession session) {
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
+        
+        List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
 
         if ((req = writeRequestQueue.poll()) != null) {
             Object m = req.getMessage();
@@ -392,18 +396,27 @@
                 // The first unwritten empty buffer must be
                 // forwarded to the filter chain.
                 if (buf.hasRemaining()) {
-                    req.getFuture().setWritten(false);
+                    failedRequests.add(req);
                 } else {
                     session.getFilterChain().fireMessageSent(req);
                 }
             } else {
-                req.getFuture().setWritten(false);
+                failedRequests.add(req);
             }
 
             // Discard others.
             while ((req = writeRequestQueue.poll()) != null) {
-                req.getFuture().setWritten(false);
+                failedRequests.add(req);
             }
+        }
+        
+        // Create an exception and notify.
+        if (!failedRequests.isEmpty()) {
+            WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+            for (WriteRequest r: failedRequests) {
+                r.getFuture().setException(cause);
+            }
+            session.getFilterChain().fireExceptionCaught(cause);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Nov  2 04:57:00 2007
@@ -208,7 +208,10 @@
         }
 
         if (isClosing() || !isConnected()) {
-            return DefaultWriteFuture.newNotWrittenFuture(this);
+            WriteFuture future = new DefaultWriteFuture(this);
+            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
+            future.setException(new WriteToClosedSessionException(request));
+            return future;
         }
 
         FileChannel channel = null;
@@ -222,7 +225,7 @@
                 message = new DefaultFileRegion(channel, 0, channel.size());
             } catch (IOException e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
-                return DefaultWriteFuture.newNotWrittenFuture(this);
+                return DefaultWriteFuture.newNotWrittenFuture(this, e);
             }
         } else if (message instanceof File) {
             File file = (File) message;
@@ -230,7 +233,7 @@
                 channel = new FileInputStream(file).getChannel();
             } catch (IOException e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
-                return DefaultWriteFuture.newNotWrittenFuture(this);
+                return DefaultWriteFuture.newNotWrittenFuture(this, e);
             }
         }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Fri Nov  2 04:57:00 2007
@@ -417,7 +417,7 @@
         }
 
         try {
-            request.getFuture().setWritten(true);
+            request.getFuture().setWritten();
         } catch (Throwable t) {
             fireExceptionCaught(t);
         }
@@ -471,7 +471,7 @@
             entry.getFilter().filterWrite(entry.getNextFilter(), session,
                     writeRequest);
         } catch (Throwable e) {
-            writeRequest.getFuture().setWritten(false);
+            writeRequest.getFuture().setException(e);
             fireExceptionCaught(e);
         }
     }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java Fri Nov  2 04:57:00 2007
@@ -32,16 +32,16 @@
      */
     public static WriteFuture newWrittenFuture(IoSession session) {
         DefaultWriteFuture unwrittenFuture = new DefaultWriteFuture(session);
-        unwrittenFuture.setWritten(true);
+        unwrittenFuture.setWritten();
         return unwrittenFuture;
     }
 
     /**
      * Returns a new {@link DefaultWriteFuture} which is already marked as 'not written'.
      */
-    public static WriteFuture newNotWrittenFuture(IoSession session) {
+    public static WriteFuture newNotWrittenFuture(IoSession session, Throwable cause) {
         DefaultWriteFuture unwrittenFuture = new DefaultWriteFuture(session);
-        unwrittenFuture.setWritten(false);
+        unwrittenFuture.setException(cause);
         return unwrittenFuture;
     }
 
@@ -54,13 +54,30 @@
 
     public boolean isWritten() {
         if (isReady()) {
-            return ((Boolean) getValue()).booleanValue();
+            Object v = getValue();
+            if (v instanceof Boolean) {
+                return ((Boolean) v).booleanValue();
+            }
         }
         return false;
     }
+    
+    public Throwable getException() {
+        if (isReady()) {
+            Object v = getValue();
+            if (v instanceof Throwable) {
+                return (Throwable) v;
+            }
+        }
+        return null;
+    }
 
-    public void setWritten(boolean written) {
-        setValue(written);
+    public void setWritten() {
+        setValue(Boolean.TRUE);
+    }
+    
+    public void setException(Throwable cause) {
+        setValue(cause);
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java Fri Nov  2 04:57:00 2007
@@ -34,7 +34,7 @@
             return false;
         }
 
-        public void setWritten(boolean written) {
+        public void setWritten() {
         }
 
         public IoSession getSession() {
@@ -85,6 +85,13 @@
 
         public boolean awaitUninterruptibly(long timeoutMillis) {
             return true;
+        }
+
+        public Throwable getException() {
+            return null;
+        }
+
+        public void setException(Throwable cause) {
         }
     };
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Fri Nov  2 04:57:00 2007
@@ -21,6 +21,7 @@
 
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.Set;
 
 /**
@@ -130,10 +131,22 @@
 
     private static void notifyWriteTimeout(IoSession session,
             long currentTime, long writeTimeout, long lastIoTime) {
-        if (session instanceof AbstractIoSession &&
-                writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
-                !((AbstractIoSession) session).getWriteRequestQueue().isEmpty()) {
-            session.getFilterChain().fireExceptionCaught(new WriteTimeoutException());
+        if (!(session instanceof AbstractIoSession)) {
+            return;
+        }
+
+        AbstractIoSession s = (AbstractIoSession) session;
+        if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
+                !s.getWriteRequestQueue().isEmpty()) {
+            Queue<WriteRequest> queue = s.getWriteRequestQueue();
+            WriteRequest request = queue.peek();
+            if (request != null) {
+                WriteTimeoutException cause = new WriteTimeoutException(request);
+                queue.poll().getFuture().setException(cause);
+                s.getFilterChain().fireExceptionCaught(cause);
+                // WriteException is an IOException, so we close the session.
+                s.close();
+            }
         }
     }
 }

Added: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.Collection;
+
+/**
+ * An exception which is thrown when one or more write requests resulted
+ * in no actual write operation.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class NothingWrittenException extends WriteException {
+
+    private static final long serialVersionUID = -6331979307737691005L;
+
+    public NothingWrittenException(Collection<WriteRequest> requests,
+            String message, Throwable cause) {
+        super(requests, message, cause);
+    }
+
+    public NothingWrittenException(Collection<WriteRequest> requests, String s) {
+        super(requests, s);
+    }
+
+    public NothingWrittenException(Collection<WriteRequest> requests,
+            Throwable cause) {
+        super(requests, cause);
+    }
+
+    public NothingWrittenException(Collection<WriteRequest> requests) {
+        super(requests);
+    }
+
+    public NothingWrittenException(WriteRequest request, String message,
+            Throwable cause) {
+        super(request, message, cause);
+    }
+
+    public NothingWrittenException(WriteRequest request, String s) {
+        super(request, s);
+    }
+
+    public NothingWrittenException(WriteRequest request, Throwable cause) {
+        super(request, cause);
+    }
+
+    public NothingWrittenException(WriteRequest request) {
+        super(request);
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,151 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An exception which is thrown when one or more write operations were failed.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class WriteException extends IOException {
+
+    private static final long serialVersionUID = -4174407422754524197L;
+    
+    private final List<WriteRequest> requests;
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(WriteRequest request) {
+        super();
+        this.requests = asRequestList(request);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(WriteRequest request, String s) {
+        super(s);
+        this.requests = asRequestList(request);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(WriteRequest request, String message, Throwable cause) {
+        super(message);
+        initCause(cause);
+        this.requests = asRequestList(request);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(WriteRequest request, Throwable cause) {
+        initCause(cause);
+        this.requests = asRequestList(request);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(Collection<WriteRequest> requests) {
+        super();
+        this.requests = asRequestList(requests);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(Collection<WriteRequest> requests, String s) {
+        super(s);
+        this.requests = asRequestList(requests);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(Collection<WriteRequest> requests, String message, Throwable cause) {
+        super(message);
+        initCause(cause);
+        this.requests = asRequestList(requests);
+    }
+
+    /**
+     * Creates a new exception.
+     */
+    public WriteException(Collection<WriteRequest> requests, Throwable cause) {
+        initCause(cause);
+        this.requests = asRequestList(requests);
+    }
+
+    /**
+     * Returns the list of the failed {@link WriteRequest}, in the order of occurrance.
+     */
+    public List<WriteRequest> getRequests() {
+        return requests;
+    }
+
+    /**
+     * Returns the firstly failed {@link WriteRequest}. 
+     */
+    public WriteRequest getRequest() {
+        return requests.get(0);
+    }
+    
+    private static List<WriteRequest> asRequestList(Collection<WriteRequest> requests) {
+        if (requests == null) {
+            throw new NullPointerException("requests");
+        }
+        if (requests.isEmpty()) {
+            throw new IllegalArgumentException("requests is empty.");
+        }
+
+        List<WriteRequest> newRequests = new ArrayList<WriteRequest>(requests.size());
+        for (WriteRequest r: requests) {
+            newRequests.add(unwrapRequest(r));
+        }
+        return Collections.unmodifiableList(newRequests);
+    }
+
+    private static List<WriteRequest> asRequestList(WriteRequest request) {
+        if (request == null) {
+            throw new NullPointerException("request");
+        }
+        
+        List<WriteRequest> requests = new ArrayList<WriteRequest>(1);
+        requests.add(unwrapRequest(request));
+        return Collections.unmodifiableList(requests);
+    }
+
+    private static WriteRequest unwrapRequest(WriteRequest request) {
+        while (request instanceof WriteRequestWrapper) {
+            request = ((WriteRequestWrapper) request).getWriteRequest();
+        }
+        return request;
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java Fri Nov  2 04:57:00 2007
@@ -47,13 +47,27 @@
      * Returns <tt>true</tt> if the write operation is finished successfully.
      */
     boolean isWritten();
+    
+    /**
+     * Returns the cause of the write failure if and only if the write
+     * operation has failed due to an {@link Exception}.  Otherwise,
+     * <tt>null</tt> is returned.
+     */
+    Throwable getException();
 
     /**
-     * Sets whether the message is written or not, and notifies all threads
-     * waiting for this future.  This method is invoked by MINA internally.
-     * Please do not call this method directly.
+     * Sets the message is written, and notifies all threads waiting for
+     * this future.  This method is invoked by MINA internally.  Please do
+     * not call this method directly.
+     */
+    void setWritten();
+    
+    /**
+     * Sets the cause of the write failure, and notifies all threads waiting
+     * for this future.  This method is invoked by MINA internally.  Please
+     * do not call this method directly.
      */
-    void setWritten(boolean written);
+    void setException(Throwable cause);
 
     WriteFuture await() throws InterruptedException;
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java Fri Nov  2 04:57:00 2007
@@ -19,44 +19,51 @@
  */
 package org.apache.mina.common;
 
-import java.io.IOException;
+import java.util.Collection;
+
 
 /**
- * An {@link IOException} which is thrown when write buffer is not flushed for
+ * An exception which is thrown when write buffer is not flushed for
  * {@link IoSessionConfig#getWriteTimeout()} seconds.
  *
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$,
  */
-public class WriteTimeoutException extends IOException {
+public class WriteTimeoutException extends WriteException {
     private static final long serialVersionUID = 3906931157944579121L;
 
-    /**
-     * Creates a new exception.
-     */
-    public WriteTimeoutException() {
-        super();
-    }
-
-    /**
-     * Creates a new exception.
-     */
-    public WriteTimeoutException(String s) {
-        super(s);
-    }
-
-    /**
-     * Creates a new exception.
-     */
-    public WriteTimeoutException(String message, Throwable cause) {
-        super(message);
-        initCause(cause);
-    }
-
-    /**
-     * Creates a new exception.
-     */
-    public WriteTimeoutException(Throwable cause) {
-        initCause(cause);
+    public WriteTimeoutException(Collection<WriteRequest> requests,
+            String message, Throwable cause) {
+        super(requests, message, cause);
+    }
+
+    public WriteTimeoutException(Collection<WriteRequest> requests, String s) {
+        super(requests, s);
+    }
+
+    public WriteTimeoutException(Collection<WriteRequest> requests,
+            Throwable cause) {
+        super(requests, cause);
+    }
+
+    public WriteTimeoutException(Collection<WriteRequest> requests) {
+        super(requests);
+    }
+
+    public WriteTimeoutException(WriteRequest request, String message,
+            Throwable cause) {
+        super(request, message, cause);
+    }
+
+    public WriteTimeoutException(WriteRequest request, String s) {
+        super(request, s);
+    }
+
+    public WriteTimeoutException(WriteRequest request, Throwable cause) {
+        super(request, cause);
+    }
+
+    public WriteTimeoutException(WriteRequest request) {
+        super(request);
     }
 }

Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,70 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.Collection;
+
+/**
+ * An exception which is thrown when one or more write operations were
+ * attempted on a closed session.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class WriteToClosedSessionException extends WriteException {
+
+    private static final long serialVersionUID = 5550204573739301393L;
+
+    public WriteToClosedSessionException(Collection<WriteRequest> requests,
+            String message, Throwable cause) {
+        super(requests, message, cause);
+    }
+
+    public WriteToClosedSessionException(Collection<WriteRequest> requests,
+            String s) {
+        super(requests, s);
+    }
+
+    public WriteToClosedSessionException(Collection<WriteRequest> requests,
+            Throwable cause) {
+        super(requests, cause);
+    }
+
+    public WriteToClosedSessionException(Collection<WriteRequest> requests) {
+        super(requests);
+    }
+
+    public WriteToClosedSessionException(WriteRequest request, String message,
+            Throwable cause) {
+        super(request, message, cause);
+    }
+
+    public WriteToClosedSessionException(WriteRequest request, String s) {
+        super(request, s);
+    }
+
+    public WriteToClosedSessionException(WriteRequest request, Throwable cause) {
+        super(request, cause);
+    }
+
+    public WriteToClosedSessionException(WriteRequest request) {
+        super(request);
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Fri Nov  2 04:57:00 2007
@@ -31,6 +31,7 @@
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.NothingWrittenException;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestWrapper;
@@ -223,7 +224,7 @@
 
         try {
             encoder.encode(session, message, encoderOut);
-            encoderOut.flush();
+            encoderOut.flushWithoutFuture();
             nextFilter.filterWrite(session, new MessageWriteRequest(
                     writeRequest));
         } catch (Throwable t) {
@@ -403,10 +404,28 @@
             }
 
             if (future == null) {
-                future = DefaultWriteFuture.newNotWrittenFuture(session);
+                future = DefaultWriteFuture.newNotWrittenFuture(
+                        session, new NothingWrittenException(writeRequest));
             }
 
             return future;
+        }
+        
+        public void flushWithoutFuture() {
+            Queue<IoBuffer> bufferQueue = getBufferQueue();
+            for (;;) {
+                IoBuffer buf = bufferQueue.poll();
+                if (buf == null) {
+                    break;
+                }
+
+                // Flush only when the buffer has remaining.
+                if (buf.hasRemaining()) {
+                    nextFilter.filterWrite(
+                            session, new EncodedWriteRequest(
+                                    buf, null, writeRequest.getDestination()));
+                }
+            }
         }
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java Fri Nov  2 04:57:00 2007
@@ -60,7 +60,7 @@
 public class ProtocolCodecSession extends DummySession {
 
     private final WriteFuture notWrittenFuture =
-        DefaultWriteFuture.newNotWrittenFuture(this);
+        DefaultWriteFuture.newNotWrittenFuture(this, new UnsupportedOperationException());
 
     private final AbstractProtocolEncoderOutput encoderOutput =
         new AbstractProtocolEncoderOutput() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java Fri Nov  2 04:57:00 2007
@@ -20,6 +20,8 @@
 package org.apache.mina.filter.ssl;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -40,6 +42,7 @@
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.common.WriteToClosedSessionException;
 
 /**
  * An SSL filter that encrypts and decrypts the data exchanged in the session.
@@ -464,6 +467,63 @@
     }
 
     @Override
+    public void exceptionCaught(NextFilter nextFilter, IoSession session,
+            Throwable cause) throws Exception {
+
+        if (cause instanceof WriteToClosedSessionException) {
+            // Filter out SSL close notify, which is likely to fail to flush
+            // due to disconnection.
+            WriteToClosedSessionException e = (WriteToClosedSessionException) cause;
+            List<WriteRequest> failedRequests = e.getRequests();
+            boolean containsCloseNotify = false;
+            for (WriteRequest r: failedRequests) {
+                if (isCloseNotify(r.getMessage())) {
+                    containsCloseNotify = true;
+                    break;
+                }
+            }
+            
+            if (containsCloseNotify) {
+                if (failedRequests.size() == 1) {
+                    // close notify is the only failed request; bail out.
+                    return;
+                }
+                
+                List<WriteRequest> newFailedRequests =
+                    new ArrayList<WriteRequest>(failedRequests.size() - 1);
+                for (WriteRequest r: failedRequests) {
+                    if (!isCloseNotify(r.getMessage())) {
+                        newFailedRequests.add(r);
+                    }
+                }
+                
+                if (newFailedRequests.isEmpty()) {
+                    // the failedRequests were full with close notify; bail out.
+                    return;
+                }
+                
+                cause = new WriteToClosedSessionException(
+                        newFailedRequests, cause.getMessage(), cause.getCause());
+            }
+        }
+        
+        nextFilter.exceptionCaught(session, cause);
+    }
+        
+    private boolean isCloseNotify(Object message) {
+        if (!(message instanceof IoBuffer)) {
+            return false;
+        }
+        
+        IoBuffer buf = (IoBuffer) message;
+        int offset = buf.position();
+        return buf.remaining() == 23 &&
+               buf.get(offset + 0) == 0x15 && buf.get(offset + 1) == 0x03 &&
+               buf.get(offset + 2) == 0x01 && buf.get(offset + 3) == 0x00 &&
+               buf.get(offset + 4) == 0x12;
+    }
+
+    @Override
     public void filterWrite(NextFilter nextFilter, IoSession session,
             WriteRequest writeRequest) throws SSLException {
         boolean needsFlush = true;
@@ -582,11 +642,15 @@
         SslHandler handler = getSslSessionHandler(session);
         // if already shut down
         if (!handler.closeOutbound()) {
-            return DefaultWriteFuture.newNotWrittenFuture(session);
+            return DefaultWriteFuture.newNotWrittenFuture(
+                    session, new IllegalStateException("SSL session is shut down already."));
         }
 
         // there might be data to write out here?
         WriteFuture future = handler.writeNetBuffer(nextFilter);
+        if (future == null) {
+            future = DefaultWriteFuture.newWrittenFuture(session);
+        }
 
         if (handler.isInboundDone()) {
             handler.destroy();

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java Fri Nov  2 04:57:00 2007
@@ -540,7 +540,7 @@
         // Check if any net data needed to be writen
         if (!getOutNetBuffer().hasRemaining()) {
             // no; bail out
-            return DefaultWriteFuture.newNotWrittenFuture(session);
+            return null;
         }
 
         // set flag that we are writing encrypted data
@@ -555,7 +555,7 @@
                 logger.debug( " write outNetBuffer: "
                         + getOutNetBuffer());
             }
-            org.apache.mina.common.IoBuffer writeBuffer = copy(getOutNetBuffer());
+            IoBuffer writeBuffer = copy(getOutNetBuffer());
             if (logger.isDebugEnabled()) {
                 logger.debug( " session write: " + writeBuffer);
             }
@@ -580,7 +580,7 @@
                         logger.debug( " write outNetBuffer2: "
                                 + getOutNetBuffer());
                     }
-                    org.apache.mina.common.IoBuffer writeBuffer2 = copy(getOutNetBuffer());
+                    IoBuffer writeBuffer2 = copy(getOutNetBuffer());
                     writeFuture = new DefaultWriteFuture(session);
                     parent.filterWrite(nextFilter, session,
                             new DefaultWriteRequest(writeBuffer2, writeFuture));
@@ -590,11 +590,7 @@
             writingEncryptedData = false;
         }
 
-        if (writeFuture != null) {
-            return writeFuture;
-        } else {
-            return DefaultWriteFuture.newNotWrittenFuture(session);
-        }
+        return writeFuture;
     }
 
     private void unwrap(NextFilter nextFilter) throws SSLException {

Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java Fri Nov  2 04:57:00 2007
@@ -31,6 +31,7 @@
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.IoFilter.NextFilter;
 
 /**
  * Filter implementation which makes it possible to write {@link InputStream}
@@ -105,7 +106,7 @@
             IoBuffer buffer = getNextBuffer(inputStream);
             if (buffer == null) {
                 // End of stream reached.
-                writeRequest.getFuture().setWritten(true);
+                writeRequest.getFuture().setWritten();
                 nextFilter.messageSent(session, writeRequest);
             } else {
                 session.setAttribute(CURRENT_STREAM, inputStream);
@@ -156,7 +157,7 @@
                     }
                 }
 
-                currentWriteRequest.getFuture().setWritten(true);
+                currentWriteRequest.getFuture().setWritten();
                 nextFilter.messageSent(session, currentWriteRequest);
             } else {
                 nextFilter.filterWrite(session, new DefaultWriteRequest(

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,76 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+import java.util.Collection;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteException;
+import org.apache.mina.common.WriteRequest;
+
+/**
+ * An exception that is thrown by {@link WriteThrottleFilter} when
+ * there are too many scheduled write requests or too much amount 
+ * of scheduled write data in an {@link IoSession}'s internal write
+ * request queue.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class TooManyScheduledWritesException extends WriteException {
+
+    private static final long serialVersionUID = 7377810360950976904L;
+
+    public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+            String message, Throwable cause) {
+        super(requests, message, cause);
+    }
+
+    public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+            String s) {
+        super(requests, s);
+    }
+
+    public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+            Throwable cause) {
+        super(requests, cause);
+    }
+
+    public TooManyScheduledWritesException(Collection<WriteRequest> requests) {
+        super(requests);
+    }
+
+    public TooManyScheduledWritesException(WriteRequest request,
+            String message, Throwable cause) {
+        super(request, message, cause);
+    }
+
+    public TooManyScheduledWritesException(WriteRequest request, String s) {
+        super(request, s);
+    }
+
+    public TooManyScheduledWritesException(WriteRequest request, Throwable cause) {
+        super(request, cause);
+    }
+
+    public TooManyScheduledWritesException(WriteRequest request) {
+        super(request);
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,329 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteException;
+import org.apache.mina.common.WriteRequest;
+
+/**
+ * This filter will turn the asynchronous write method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting sending. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ *  They could be:
+ *          Block - As this does
+ *          Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ *          Throw Exception - through the client write() method to allow them to get immediate feedback on buffer state
+ * 
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class WriteThrottleFilter extends IoFilterAdapter {
+
+    private final Object logLock = new Object();
+    private final Object blockLock = new Object();
+
+    private long lastLogTime = -1;
+    private int blockWaiters = 0;
+    
+    private volatile WriteThrottlePolicy policy;
+    
+    private volatile int localMaxScheduledWriteMessages;
+    private volatile long localMaxScheduledWriteBytes;
+    private volatile int globalMaxScheduledWriteMessages;
+    private volatile long globalMaxScheduledWriteBytes;
+    
+    public WriteThrottleFilter() {
+        this(WriteThrottlePolicy.LOG);
+    }
+    
+    public WriteThrottleFilter(WriteThrottlePolicy policy) {
+        this(policy, 4096, 65536, 131072, 1048576 * 128);
+    }
+    
+    public WriteThrottleFilter(
+            int localMaxScheduledWriteMessages, long localMaxScheduledWriteBytes,
+            int globalMaxScheduledWriteMessages, long globalMaxScheduledWriteBytes) {
+        this(WriteThrottlePolicy.LOG,
+             localMaxScheduledWriteMessages, localMaxScheduledWriteBytes,
+             globalMaxScheduledWriteMessages, globalMaxScheduledWriteBytes);
+    }
+    
+    public WriteThrottleFilter(
+            WriteThrottlePolicy policy,
+            int localMaxScheduledWriteMessages, long localMaxScheduledWriteBytes,
+            int globalMaxScheduledWriteMessages, long globalMaxScheduledWriteBytes) {
+
+        setPolicy(policy);
+        setLocalMaxScheduledWriteMessages(localMaxScheduledWriteMessages);
+        setLocalMaxScheduledWriteBytes(localMaxScheduledWriteBytes);
+        setGlobalMaxScheduledWriteMessages(globalMaxScheduledWriteMessages);
+        setGlobalMaxScheduledWriteBytes(globalMaxScheduledWriteBytes);
+    }
+    
+    public WriteThrottlePolicy getPolicy() {
+        return policy;
+    }
+    
+    public void setPolicy(WriteThrottlePolicy policy) {
+        if (policy == null) {
+            throw new NullPointerException("policy");
+        }
+        this.policy = policy;
+    }
+
+    public int getLocalMaxScheduledWriteMessages() {
+        return localMaxScheduledWriteMessages;
+    }
+
+    public void setLocalMaxScheduledWriteMessages(int localMaxScheduledWriteMessages) {
+        if (localMaxScheduledWriteMessages < 0) {
+            localMaxScheduledWriteMessages = 0;
+        }
+        this.localMaxScheduledWriteMessages = localMaxScheduledWriteMessages;
+    }
+
+    public long getLocalMaxScheduledWriteBytes() {
+        return localMaxScheduledWriteBytes;
+    }
+
+    public void setLocalMaxScheduledWriteBytes(long localMaxScheduledWriteBytes) {
+        if (localMaxScheduledWriteBytes < 0) {
+            localMaxScheduledWriteBytes = 0;
+        }
+        this.localMaxScheduledWriteBytes = localMaxScheduledWriteBytes;
+    }
+
+    public int getGlobalMaxScheduledWriteMessages() {
+        return globalMaxScheduledWriteMessages;
+    }
+
+    public void setGlobalMaxScheduledWriteMessages(int globalMaxScheduledWriteMessages) {
+        if (globalMaxScheduledWriteMessages < 0) {
+            globalMaxScheduledWriteMessages = 0;
+        }
+        this.globalMaxScheduledWriteMessages = globalMaxScheduledWriteMessages;
+    }
+
+    public long getGlobalMaxScheduledWriteBytes() {
+        return globalMaxScheduledWriteBytes;
+    }
+
+    public void setGlobalMaxScheduledWriteBytes(long globalMaxScheduledWriteBytes) {
+        if (globalMaxScheduledWriteBytes < 0) {
+            globalMaxScheduledWriteBytes = 0;
+        }
+        this.globalMaxScheduledWriteBytes = globalMaxScheduledWriteBytes;
+    }
+
+    @Override
+    public void filterWrite(NextFilter nextFilter, IoSession session,
+            WriteRequest writeRequest) throws Exception {
+        
+        WriteThrottlePolicy policy = getPolicy();
+        if (policy != WriteThrottlePolicy.OFF) {
+            if (!readyToWrite(session)) {
+                switch (getPolicy()) {
+                case LOG:
+                    log(session);
+                    break;
+                case BLOCK:
+                    block(session);
+                    break;
+                case LOG_AND_BLOCK:
+                    log(session);
+                    block(session);
+                    break;
+                case EXCEPTION:
+                    raiseException(session, writeRequest);
+                default:
+                    throw new InternalError();    
+                }
+            }
+        }
+        
+        nextFilter.filterWrite(session, writeRequest);
+    }
+    
+    private boolean readyToWrite(IoSession session) {
+        if (session.isClosing()) {
+            return true;
+        }
+
+        int lmswm = localMaxScheduledWriteMessages;
+        long lmswb = localMaxScheduledWriteBytes;
+        int gmswm = globalMaxScheduledWriteMessages;
+        long gmswb = globalMaxScheduledWriteBytes;
+        
+        return (lmswm == 0 || session.getScheduledWriteMessages() < lmswm) &&
+               (lmswb == 0 || session.getScheduledWriteBytes() < lmswb) &&
+               (gmswm == 0 || session.getService().getScheduledWriteMessages() < gmswm) &&
+               (gmswb == 0 || session.getService().getScheduledWriteBytes() < gmswb);
+    }
+    
+    private void log(IoSession session) {
+        long currentTime = System.currentTimeMillis();
+        
+        // Prevent log flood by logging every 3 seconds.
+        boolean log;
+        synchronized (logLock) {
+            if (currentTime - lastLogTime > 3000) {
+                lastLogTime = currentTime;
+                log = true;
+            } else {
+                log = false;
+            }
+        }
+        
+        if (log) {
+            IoSessionLogger.getLogger(session, getClass()).warn(getMessage(session));
+        }
+    }
+    
+    @Override
+    public void messageSent(
+            NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+        notifyWaitingWriters();
+        nextFilter.messageSent(session, writeRequest);
+    }
+
+    @Override
+    public void exceptionCaught(NextFilter nextFilter, IoSession session,
+            Throwable cause) throws Exception {
+        try {
+            nextFilter.exceptionCaught(session, cause);
+        } finally {
+            notifyWaitingWriters();
+        }
+    }
+
+    @Override
+    public void sessionClosed(NextFilter nextFilter, IoSession session)
+            throws Exception {
+        notifyWaitingWriters();
+        nextFilter.sessionClosed(session);
+    }
+
+    private void block(IoSession session) {
+        synchronized (blockLock) {
+            blockWaiters ++;
+            while (!readyToWrite(session)) {
+                try {
+                    blockLock.wait();
+                } catch (InterruptedException e) {
+                    // Ignore.
+                }
+            }
+            blockWaiters --;
+        }
+    }
+    
+    private void notifyWaitingWriters() {
+        synchronized (blockLock) {
+            if (blockWaiters != 0) {
+                blockLock.notifyAll();
+            }
+        }
+    }
+
+    private void raiseException(IoSession session, WriteRequest writeRequest) throws WriteException {
+        throw new TooManyScheduledWritesException(writeRequest, getMessage(session));
+    }
+    
+    private String getMessage(IoSession session) {
+        int lmswm = localMaxScheduledWriteMessages;
+        long lmswb = localMaxScheduledWriteBytes;
+        int gmswm = globalMaxScheduledWriteMessages;
+        long gmswb = globalMaxScheduledWriteBytes;
+
+        StringBuilder buf = new StringBuilder(512);
+        buf.append("Write requests flooded - local: ");
+        if (lmswm != 0) {
+            buf.append(session.getScheduledWriteMessages());
+            buf.append(" / ");
+            buf.append(lmswm);
+            buf.append(" msgs, ");
+        } else {
+            buf.append(session.getScheduledWriteMessages());
+            buf.append(" / unlimited msgs, ");
+        }
+        
+        if (lmswb != 0) {
+            buf.append(session.getScheduledWriteBytes());
+            buf.append(" / ");
+            buf.append(lmswb);
+            buf.append(" bytes, ");
+        } else {
+            buf.append(session.getScheduledWriteBytes());
+            buf.append(" / unlimited bytes, ");
+        }
+        
+        buf.append("global: ");
+        if (gmswm != 0) {
+            buf.append(session.getService().getScheduledWriteMessages());
+            buf.append(" / ");
+            buf.append(gmswm);
+            buf.append(" msgs, ");
+        } else {
+            buf.append(session.getService().getScheduledWriteMessages());
+            buf.append(" / unlimited msgs, ");
+        }
+        
+        if (gmswb != 0) {
+            buf.append(session.getService().getScheduledWriteBytes());
+            buf.append(" / ");
+            buf.append(gmswb);
+            buf.append(" bytes");
+        } else {
+            buf.append(session.getService().getScheduledWriteBytes());
+            buf.append(" / unlimited bytes");
+        }
+        
+        return buf.toString();
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java Fri Nov  2 04:57:00 2007
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+/**
+ * Tells {@link WriteThrottleFilter} what to do when there are too many
+ * scheduled write requests in the session buffer.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public enum WriteThrottlePolicy {
+    /**
+     * Do nothing; disables the filter.
+     */
+    OFF,
+    /**
+     * Log a warning message, but doesn't limit anything.
+     */
+    LOG,
+    /**
+     * Block the write operation until the size of write request buffer
+     * is full.  You must use this policy in conjunction with the
+     * {@link ReadThrottleFilterChainBuilder} to prevent the
+     * {@link OutOfMemoryError} on the reader side.
+     */
+    BLOCK,
+    /**
+     * Combination of {@link #WARN} and {@link #BLOCK}.
+     */
+    LOG_AND_BLOCK,
+    /**
+     * Raise a {@link TooManyScheduledWritesException}.
+     */
+    EXCEPTION,
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Fri Nov  2 04:57:00 2007
@@ -33,6 +33,7 @@
 import org.apache.mina.common.IoProcessor;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteToClosedSessionException;
 
 /**
  * @author The Apache MINA Project (dev@mina.apache.org)
@@ -199,9 +200,13 @@
                     flushPendingDataQueues(s);
                 }
             } else {
-                WriteRequest req;
-                while ((req = queue.poll()) != null) {
-                    req.getFuture().setWritten(false);
+                List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(queue);
+                if (!failedRequests.isEmpty()) {
+                    WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+                    for (WriteRequest r: failedRequests) {
+                        r.getFuture().setException(cause);
+                    }
+                    s.getFilterChain().fireExceptionCaught(cause);
                 }
             }
         }

Modified: mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java Fri Nov  2 04:57:00 2007
@@ -94,7 +94,7 @@
         TestThread thread = new TestThread(future);
         thread.start();
 
-        future.setWritten(true);
+        future.setWritten();
         thread.join();
 
         assertTrue(thread.success);
@@ -105,12 +105,13 @@
         thread = new TestThread(future);
         thread.start();
 
-        future.setWritten(false);
+        future.setException(new Exception());
         thread.join();
 
         assertTrue(thread.success);
         assertTrue(future.isReady());
         assertFalse(future.isWritten());
+        assertTrue(future.getException().getClass() == Exception.class);
     }
 
     public void testAddListener() throws Exception {

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Fri Nov  2 04:57:00 2007
@@ -25,10 +25,10 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.mina.common.BufferDataException;
 import org.apache.mina.common.IoBuffer;
 import org.apache.mina.filter.codec.ProtocolCodecSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.RecoverableProtocolDecoderException;
 
 /**
  * Tests {@link TextLineDecoder}.
@@ -273,7 +273,7 @@
         try {
             decoder.decode(session, in, out);
             Assert.fail();
-        } catch (BufferDataException e) {
+        } catch (RecoverableProtocolDecoderException e) {
             // Success!
         }
 
@@ -296,7 +296,7 @@
         try {
             decoder.decode(session, in, out);
             Assert.fail();
-        } catch (BufferDataException e) {
+        } catch (RecoverableProtocolDecoderException e) {
             // Success!
         }
 

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java Fri Nov  2 04:57:00 2007
@@ -535,8 +535,8 @@
             return written;
         }
 
-        public void setWritten(boolean written) {
-            this.written = written;
+        public void setWritten() {
+            this.written = true;
         }
 
         public IoSession getSession() {
@@ -589,6 +589,14 @@
 
         public boolean awaitUninterruptibly(long timeoutMillis) {
             return true;
+        }
+
+        public Throwable getException() {
+            return null;
+        }
+
+        public void setException(Throwable cause) {
+            throw new IllegalStateException();
         }
     }
 }

Modified: mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java (original)
+++ mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java Fri Nov  2 04:57:00 2007
@@ -19,16 +19,16 @@
  */
 package org.apache.mina.example.echoserver;
 
-import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteException;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.SocketSession;
 import org.apache.mina.transport.socket.SocketSessionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link IoHandler} implementation for echo server.
@@ -37,9 +37,6 @@
  * @version $Rev$, $Date$,
  */
 public class EchoProtocolHandler extends IoHandlerAdapter {
-    private static final Logger log = LoggerFactory
-            .getLogger(EchoProtocolHandler.class);
-
     @Override
     public void sessionCreated(IoSession session) {
         if (session instanceof SocketSession) {
@@ -55,13 +52,17 @@
 
     @Override
     public void sessionIdle(IoSession session, IdleStatus status) {
-        log.info("*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE)
-                + " ***");
+        IoSessionLogger.getLogger(session).info(
+                "*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
     }
 
     @Override
     public void exceptionCaught(IoSession session, Throwable cause) {
-        cause.printStackTrace();
+        IoSessionLogger.getLogger(session).warn(cause);
+        if (cause instanceof WriteException) {
+            WriteException e = (WriteException) cause;
+            IoSessionLogger.getLogger(session).warn("Failed write requests: {}", e.getRequests());
+        }
         session.close();
     }
 

Modified: mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java (original)
+++ mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java Fri Nov  2 04:57:00 2007
@@ -23,13 +23,14 @@
 
 import junit.framework.Assert;
 
-import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionLogger;
 import org.apache.mina.common.RuntimeIoException;
+import org.apache.mina.common.WriteException;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
 import org.apache.mina.filter.ssl.SslFilter;
@@ -50,6 +51,7 @@
 
     private final int DATA_SIZE = 16;
 
+    private EchoConnectorHandler handler;
     private SslFilter connectorSSLFilter;
 
     public ConnectorTest() {
@@ -58,7 +60,7 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-
+        handler = new EchoConnectorHandler();
         connectorSSLFilter = new SslFilter(BogusSslContextFactory
                 .getInstance(false));
         connectorSSLFilter.setUseClientMode(true); // set client mode
@@ -85,6 +87,8 @@
     }
 
     private void testConnector(IoConnector connector) throws Exception {
+        connector.setHandler(handler);
+
         System.out.println("* Without localAddress");
         testConnector(connector, false);
 
@@ -94,11 +98,8 @@
 
     private void testConnector(IoConnector connector, boolean useLocalAddress)
             throws Exception {
-        EchoConnectorHandler handler = new EchoConnectorHandler();
-
         IoSession session = null;
         if (!useLocalAddress) {
-            connector.setHandler(handler);
             ConnectFuture future = connector.connect(new InetSocketAddress(
                     "localhost", port));
             future.awaitUninterruptibly();
@@ -109,7 +110,6 @@
                 clientPort = AvailablePortFinder
                         .getNextAvailable(clientPort + 1);
                 try {
-                    connector.setHandler(handler);
                     ConnectFuture future = connector.connect(
                             new InetSocketAddress("localhost", port),
                             new InetSocketAddress(clientPort));
@@ -245,7 +245,11 @@
 
         @Override
         public void exceptionCaught(IoSession session, Throwable cause) {
-            cause.printStackTrace();
+            IoSessionLogger.getLogger(session).warn(cause);
+            if (cause instanceof WriteException) {
+                WriteException e = (WriteException) cause;
+                IoSessionLogger.getLogger(session).warn("Failed write requests: {}", e.getRequests());
+            }
         }
     }
 }