You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/07 12:02:05 UTC

svn commit: r592697 - in /mina/trunk/core/src: main/java/org/apache/mina/common/ main/java/org/apache/mina/transport/socket/nio/ main/java/org/apache/mina/transport/vmpipe/ main/java/org/apache/mina/util/ test/java/org/apache/mina/filter/codec/textline/

Author: trustin
Date: Wed Nov  7 03:01:58 2007
New Revision: 592697

URL: http://svn.apache.org/viewvc?rev=592697&view=rev
Log:
* Replaced Queue<WriteRequest> with WriteRequestQueue which provides dispose() method which is needed for a certain queue implementation like JMS queue.
* Optimized CircularQueue.clear() a little bit.


Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java
    mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
    mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Wed Nov  7 03:01:58 2007
@@ -364,7 +364,8 @@
             case OPEN:
                 try {
                     boolean flushedAll = flush(session);
-                    if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+                        !session.isScheduledForFlush()) {
                         scheduleFlush(session);
                     }
                 } catch (Exception e) {
@@ -387,12 +388,12 @@
     }
 
     private void clearWriteRequestQueue(AbstractIoSession session) {
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
         
         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
 
-        if ((req = writeRequestQueue.poll()) != null) {
+        if ((req = writeRequestQueue.poll(session)) != null) {
             Object m = req.getMessage();
             if (m instanceof IoBuffer) {
                 IoBuffer buf = (IoBuffer) req.getMessage();
@@ -410,7 +411,7 @@
             }
 
             // Discard others.
-            while ((req = writeRequestQueue.poll()) != null) {
+            while ((req = writeRequestQueue.poll(session)) != null) {
                 failedRequests.add(req);
             }
         }
@@ -435,7 +436,7 @@
         // Clear OP_WRITE
         setOpWrite(session, false);
 
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
 
         // Set limitation for the number of written bytes for read-write
         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
@@ -446,10 +447,13 @@
 
         do {
             // Check for pending writes.
-            WriteRequest req = writeRequestQueue.peek();
-
+            WriteRequest req = session.getCurrentWriteRequest();
             if (req == null) {
-                break;
+                req = writeRequestQueue.poll(session);
+                if (req == null) {
+                    break;
+                }
+                session.setCurrentWriteRequest(req);
             }
 
             long localWrittenBytes = 0;
@@ -458,8 +462,8 @@
                 FileRegion region = (FileRegion) message;
 
                 if (region.getCount() <= 0) {
-                    // File has been sent, remove from queue
-                    writeRequestQueue.poll();
+                    // File has been sent, clear the current request.
+                    session.setCurrentWriteRequest(null);
                     session.getFilterChain().fireMessageSent(req);
                     continue;
                 }
@@ -469,8 +473,8 @@
             } else {
                 IoBuffer buf = (IoBuffer) message;
                 if (buf.remaining() == 0) {
-                    // Buffer has been completely sent, remove request form queue
-                    writeRequestQueue.poll();
+                    // Buffer has been sent, clear the current request.
+                    session.setCurrentWriteRequest(null);
                     buf.reset();
                     session.getFilterChain().fireMessageSent(req);
                     continue;
@@ -518,7 +522,7 @@
                 try {
                     setOpWrite(
                             session,
-                            !session.getWriteRequestQueue().isEmpty() &&
+                            !session.getWriteRequestQueue().isEmpty(session) &&
                                     (mask & SelectionKey.OP_WRITE) != 0);
                 } catch (Exception e) {
                     session.getFilterChain().fireExceptionCaught(e);

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Wed Nov  7 03:01:58 2007
@@ -24,9 +24,6 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,8 +57,8 @@
     private final Object lock = new Object();
     
     private IoSessionAttributeMap attributes;
-    private Queue<WriteRequest> writeRequestQueue;
-
+    private WriteRequestQueue writeRequestQueue;
+    private WriteRequest currentWriteRequest;
     private final long creationTime;
 
     /**
@@ -74,33 +71,22 @@
 
     // Status variables
     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
-
     private final AtomicLong scheduledWriteBytes = new AtomicLong();
-
     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
 
     private long readBytes;
-
     private long writtenBytes;
-
     private long readMessages;
-
     private long writtenMessages;
-
     private long lastReadTime;
-
     private long lastWriteTime;
 
     private int idleCountForBoth;
-
     private int idleCountForRead;
-
     private int idleCountForWrite;
 
     private long lastIdleTimeForBoth;
-
     private long lastIdleTimeForRead;
-
     private long lastIdleTimeForWrite;
 
     private boolean deferDecreaseReadBuffer = true;
@@ -161,7 +147,7 @@
     }
 
     public CloseFuture closeOnFlush() {
-        getWriteRequestQueue().offer(CLOSE_REQUEST);
+        getWriteRequestQueue().offer(this, CLOSE_REQUEST);
         getProcessor().flush(this);
         return closeFuture;
     }
@@ -286,8 +272,9 @@
         this.attributes = attributes;
     }
     
-    protected void setWriteRequestQueue(Queue<WriteRequest> writeRequestQueue) {
-        this.writeRequestQueue = new WriteRequestQueue(writeRequestQueue);
+    protected void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
+        this.writeRequestQueue =
+            new CloseRequestAwareWriteRequestQueue(writeRequestQueue);
     }
 
     public TrafficMask getTrafficMask() {
@@ -441,9 +428,17 @@
         }
     }
 
-    protected Queue<WriteRequest> getWriteRequestQueue() {
+    protected WriteRequestQueue getWriteRequestQueue() {
         return writeRequestQueue;
     }
+    
+    protected WriteRequest getCurrentWriteRequest() {
+        return currentWriteRequest;
+    }
+    
+    protected void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
+        this.currentWriteRequest = currentWriteRequest;
+    }
 
     protected void increaseReadBufferSize() {
         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
@@ -589,97 +584,38 @@
         }
     }
     
-    private class WriteRequestQueue implements Queue<WriteRequest> {
+    private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
         
-        private final Queue<WriteRequest> q;
+        private final WriteRequestQueue q;
         
-        public WriteRequestQueue(Queue<WriteRequest> q) {
+        public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
             this.q = q;
         }
 
-        // Discard close request offered by closeOnFlush() silently.
-        public WriteRequest peek() {
-            WriteRequest answer = q.peek();
+        public synchronized WriteRequest poll(IoSession session) {
+            WriteRequest answer = q.poll(session);
             if (answer == CLOSE_REQUEST) {
                 AbstractIoSession.this.close();
-                clear();
+                dispose(session);
                 answer = null;
             }
             return answer;
         }
-
-        public synchronized WriteRequest poll() {
-            WriteRequest answer = q.poll();
-            if (answer == CLOSE_REQUEST) {
-                AbstractIoSession.this.close();
-                clear();
-                answer = null;
-            }
-            return answer;
-        }
-
-        public boolean add(WriteRequest e) {
-            return q.add(e);
-        }
-
-        public WriteRequest element() {
-            return q.element();
-        }
-
-        public boolean offer(WriteRequest e) {
-            return q.offer(e);
-        }
-
-        public WriteRequest remove() {
-            return q.remove();
-        }
-
-        public boolean addAll(Collection<? extends WriteRequest> c) {
-            return q.addAll(c);
-        }
-
-        public void clear() {
-            q.clear();
-        }
-
-        public boolean contains(Object o) {
-            return q.contains(o);
-        }
-
-        public boolean containsAll(Collection<?> c) {
-            return q.containsAll(c);
-        }
-
-        public boolean isEmpty() {
-            return q.isEmpty();
-        }
-
-        public Iterator<WriteRequest> iterator() {
-            return q.iterator();
-        }
-
-        public boolean remove(Object o) {
-            return q.remove(o);
-        }
-
-        public boolean removeAll(Collection<?> c) {
-            return q.removeAll(c);
-        }
-
-        public boolean retainAll(Collection<?> c) {
-            return q.retainAll(c);
+        
+        public void offer(IoSession session, WriteRequest e) {
+            q.offer(session, e);
         }
 
-        public int size() {
-            return q.size();
+        public boolean isEmpty(IoSession session) {
+            return q.isEmpty(session);
         }
-
-        public Object[] toArray() {
-            return q.toArray();
+        
+        public void clear(IoSession session) {
+            q.clear(session);
         }
 
-        public <T> T[] toArray(T[] a) {
-            return q.toArray(a);
+        public void dispose(IoSession session) {
+            q.dispose(session);
         }
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Wed Nov  7 03:01:58 2007
@@ -632,7 +632,7 @@
                 s.increaseScheduledWriteMessages();
             }
 
-            s.getWriteRequestQueue().add(writeRequest);
+            s.getWriteRequestQueue().offer(s, writeRequest);
             if (s.getTrafficMask().isWritable()) {
                 s.getProcessor().flush(s);
             }
@@ -683,10 +683,14 @@
                 session.getHandler().sessionClosed(session);
             } finally {
                 try {
-                    ((AbstractIoSession) session).getAttributeMap().dispose(session);
+                    ((AbstractIoSession) session).getWriteRequestQueue().dispose(session);
                 } finally {
-                    // Remove all filters.
-                    session.getFilterChain().clear();
+                    try {
+                        ((AbstractIoSession) session).getAttributeMap().dispose(session);
+                    } finally {
+                        // Remove all filters.
+                        session.getFilterChain().clear();
+                    }
                 }
             }
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java Wed Nov  7 03:01:58 2007
@@ -27,7 +27,6 @@
 import java.util.Set;
 
 import org.apache.mina.util.CircularQueue;
-import org.apache.mina.util.SynchronizedQueue;
 
 /**
  * The default {@link IoSessionDataStructureFactory} implementation
@@ -46,9 +45,9 @@
         return new DefaultIoSessionAttributeMap();
     }
     
-    public Queue<WriteRequest> getWriteRequestQueue(IoSession session)
+    public WriteRequestQueue getWriteRequestQueue(IoSession session)
             throws Exception {
-        return new SynchronizedQueue<WriteRequest>(new CircularQueue<WriteRequest>(128));
+        return new DefaultWriteRequestQueue();
     }
 
     private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
@@ -169,6 +168,35 @@
         }
 
         public void dispose(IoSession session) throws Exception {
+        }
+    }
+    
+    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
+
+        private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(128);
+        
+        public void dispose(IoSession session) {
+        }
+        
+        public void clear(IoSession session) {
+            q.clear();
+        }
+
+        public synchronized boolean isEmpty(IoSession session) {
+            return q.isEmpty();
+        }
+
+        public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+            q.offer(writeRequest);
+        }
+
+        public synchronized WriteRequest poll(IoSession session) {
+            return q.poll();
+        }
+        
+        @Override
+        public String toString() {
+            return q.toString();
         }
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Wed Nov  7 03:01:58 2007
@@ -105,7 +105,7 @@
 
             public void flush(IoSession session) {
                 getFilterChain().fireMessageSent(
-                        ((DummySession) session).getWriteRequestQueue().poll());
+                        ((DummySession) session).getWriteRequestQueue().poll(session));
             }
 
             public void remove(IoSession session) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Wed Nov  7 03:01:58 2007
@@ -21,7 +21,6 @@
 
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Queue;
 import java.util.Set;
 
 /**
@@ -137,12 +136,12 @@
 
         AbstractIoSession s = (AbstractIoSession) session;
         if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
-                !s.getWriteRequestQueue().isEmpty()) {
-            Queue<WriteRequest> queue = s.getWriteRequestQueue();
-            WriteRequest request = queue.peek();
+                !s.getWriteRequestQueue().isEmpty(session)) {
+            WriteRequest request = s.getCurrentWriteRequest();
             if (request != null) {
+                s.setCurrentWriteRequest(null);
                 WriteTimeoutException cause = new WriteTimeoutException(request);
-                queue.poll().getFuture().setException(cause);
+                request.getFuture().setException(cause);
                 s.getFilterChain().fireExceptionCaught(cause);
                 // WriteException is an IOException, so we close the session.
                 s.close();

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java Wed Nov  7 03:01:58 2007
@@ -20,7 +20,6 @@
 package org.apache.mina.common;
 
 import java.util.Comparator;
-import java.util.Queue;
 
 /**
  * Provides data structures to a newly created session.
@@ -37,12 +36,12 @@
     IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
     
     /**
-     * Returns an {@link Queue} of {@link WriteRequest}s which is going to be
-     * associated with the specified <tt>session</tt>.  Please note that the
-     * returned implementation must be thread-safe and robust enough to deal
+     * Returns an {@link WriteRequest} which is going to be associated with
+     * the specified <tt>session</tt>.  Please note that the returned
+     * implementation must be thread-safe and robust enough to deal
      * with various messages types (even what you didn't expect at all),
      * especially when you are going to implement a priority queue which
      * involves {@link Comparator}.
      */
-    Queue<WriteRequest> getWriteRequestQueue(IoSession session) throws Exception;
+    WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
 }

Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java?rev=592697&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java Wed Nov  7 03:01:58 2007
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+
+/**
+ * Stores {@link WriteRequest}s which are queued to an {@link IoSession}.
+ * 
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface WriteRequestQueue {
+
+    WriteRequest poll(IoSession session);
+    void offer(IoSession session, WriteRequest writeRequest);
+    boolean isEmpty(IoSession session);
+    void clear(IoSession session);
+    
+    /**
+     * Disposes any releases associated with the specified session.
+     * This method is invoked on disconnection.
+     */
+    void dispose(IoSession session);
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Wed Nov  7 03:01:58 2007
@@ -44,6 +44,7 @@
 import org.apache.mina.common.RuntimeIoException;
 import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
 import org.apache.mina.transport.socket.DatagramAcceptor;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
@@ -366,7 +367,8 @@
 
             try {
                 boolean flushedAll = flush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+                    !session.isScheduledForFlush()) {
                     scheduleFlush(session);
                 }
             } catch (IOException e) {
@@ -388,20 +390,24 @@
         key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
 
         DatagramChannel ch = session.getChannel();
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
 
         int writtenBytes = 0;
         int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
         for (; ;) {
-            WriteRequest req = writeRequestQueue.peek();
+            WriteRequest req = session.getCurrentWriteRequest();
             if (req == null) {
-                break;
+                req = writeRequestQueue.poll(session);
+                if (req == null) {
+                    break;
+                }
+                session.setCurrentWriteRequest(req);
             }
 
             IoBuffer buf = (IoBuffer) req.getMessage();
             if (buf.remaining() == 0) {
-                // pop and fire event
-                writeRequestQueue.poll();
+                // Clear and fire event
+                session.setCurrentWriteRequest(null);
                 buf.reset();
                 session.getFilterChain().fireMessageSent(req);
                 continue;
@@ -420,8 +426,8 @@
             } else {
                 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
 
-                // pop and fire event
-                writeRequestQueue.poll();
+                // Clear and fire event
+                session.setCurrentWriteRequest(null);
                 writtenBytes += localWrittenBytes;
                 buf.reset();
                 session.getFilterChain().fireMessageSent(req);

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java Wed Nov  7 03:01:58 2007
@@ -24,7 +24,6 @@
 import java.net.SocketException;
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
-import java.util.Queue;
 
 import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.DefaultTransportMetadata;
@@ -37,6 +36,7 @@
 import org.apache.mina.common.RuntimeIoException;
 import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
 import org.apache.mina.transport.socket.DatagramSession;
 import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -153,8 +153,18 @@
     }
 
     @Override
-    protected Queue<WriteRequest> getWriteRequestQueue() {
+    protected WriteRequestQueue getWriteRequestQueue() {
         return super.getWriteRequestQueue();
+    }
+
+    @Override
+    protected WriteRequest getCurrentWriteRequest() {
+        return super.getCurrentWriteRequest();
+    }
+
+    @Override
+    protected void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
+        super.setCurrentWriteRequest(currentWriteRequest);
     }
 
     @Override

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Wed Nov  7 03:01:58 2007
@@ -33,6 +33,7 @@
 import org.apache.mina.common.IoProcessor;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
 import org.apache.mina.common.WriteToClosedSessionException;
 
 /**
@@ -168,15 +169,15 @@
     private class VmPipeIoProcessor implements IoProcessor {
         public void flush(IoSession session) {
             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            Queue<WriteRequest> queue = s.getWriteRequestQueue();
-            if (queue.isEmpty()) {
+            WriteRequestQueue queue = s.getWriteRequestQueue();
+            if (queue.isEmpty(s)) {
                 return;
             }
             if (s.isConnected()) {
                 if (s.getLock().tryLock()) {
                     try {
                         WriteRequest req;
-                        while ((req = queue.poll()) != null) {
+                        while ((req = queue.poll(s)) != null) {
                             Object message = req.getMessage();
                             Object messageCopy = message;
                             if (message instanceof IoBuffer) {
@@ -200,7 +201,12 @@
                     flushPendingDataQueues(s);
                 }
             } else {
-                List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(queue);
+                List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
+                WriteRequest req;
+                while ((req = queue.poll(s)) != null) {
+                    failedRequests.add(req);
+                }
+                
                 if (!failedRequests.isEmpty()) {
                     WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
                     for (WriteRequest r: failedRequests) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Wed Nov  7 03:01:58 2007
@@ -19,7 +19,6 @@
  */
 package org.apache.mina.transport.vmpipe;
 
-import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.Lock;
@@ -34,7 +33,7 @@
 import org.apache.mina.common.IoServiceListenerSupport;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
 
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
@@ -153,7 +152,7 @@
     }
 
     @Override
-    protected Queue<WriteRequest> getWriteRequestQueue() {
+    protected WriteRequestQueue getWriteRequestQueue() {
         return super.getWriteRequestQueue();
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java Wed Nov  7 03:01:58 2007
@@ -73,10 +73,12 @@
 
     @Override
     public void clear() {
-        Arrays.fill(items, null);
-        first = 0;
-        last = 0;
-        full = false;
+        if (!isEmpty()) {
+            Arrays.fill(items, null);
+            first = 0;
+            last = 0;
+            full = false;
+        }
     }
 
     @SuppressWarnings("unchecked")

Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Wed Nov  7 03:01:58 2007
@@ -282,8 +282,9 @@
         Assert.assertEquals("B", session.getDecoderOutputQueue().poll());
 
         // Make sure OOM is not thrown.
+        System.gc();
         long oldFreeMemory = Runtime.getRuntime().freeMemory();
-        in = IoBuffer.allocate(1048576 * 16).mark();
+        in = IoBuffer.allocate(1048576 * 16).sweep((byte) ' ').mark();
         for (int i = 0; i < 10; i ++) {
             decoder.decode(session, in.reset().mark(), out);
             Assert.assertEquals(0, session.getDecoderOutputQueue().size());