You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/14 12:04:56 UTC

svn commit: r575603 - in /mina: branches/1.0/core/src/main/java/org/apache/mina/common/support/ branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/ branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/ branches...

Author: trustin
Date: Fri Sep 14 03:04:45 2007
New Revision: 575603

URL: http://svn.apache.org/viewvc?rev=575603&view=rev
Log:
Resolved issue:
* DIRMINA-429 - AbstractIoFilterChain.doWrite slows as outstanding writes increase
* DIRMINA-430 - IoSession.getScheduledWriteBytes() slows with size of queue
* DIRMINA-431 - Deadlock in VmPipe mode

How I resolved:
* Applied the same fix to Datagram transport
* Backported all fixes to 1.0
* Moved some common methods to BaseIoSession (AbstractIoSession)


Added:
    mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java   (with props)
Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java
    mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Sep 14 03:04:45 2007
@@ -27,12 +27,17 @@
 
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TrafficMask;
 import org.apache.mina.common.WriteFuture;
 import org.apache.mina.common.IoFilter.WriteRequest;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Base implementation of {@link IoSession}.
  * 
@@ -40,6 +45,16 @@
  * @version $Rev$, $Date$
  */
 public abstract class BaseIoSession implements IoSession {
+    
+    private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+        new IoFutureListener() {
+            public void operationComplete(IoFuture future) {
+                BaseIoSession s = (BaseIoSession) future.getSession();
+                s.scheduledWriteBytes.set(0);
+                s.scheduledWriteRequests.set(0);
+            }
+        };
+
     private final Map attributes = new HashMap(8);
 
     private final long creationTime;
@@ -49,6 +64,12 @@
      */
     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
 
+    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+    
+    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
+    private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
+
     private boolean closing;
 
     // Configuration variables
@@ -90,6 +111,7 @@
     protected BaseIoSession() {
         creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
                 .currentTimeMillis();
+        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
     }
 
     public boolean isConnected() {
@@ -103,6 +125,19 @@
     public CloseFuture getCloseFuture() {
         return closeFuture;
     }
+    
+    public boolean isScheduledForFlush() {
+        return scheduledForFlush.get();
+    }
+    
+    public boolean setScheduledForFlush(boolean flag) {
+        if (flag) {
+            return scheduledForFlush.compareAndSet(false, true);
+        } else {
+            scheduledForFlush.set(false);
+            return true;
+        }
+    }
 
     public CloseFuture close() {
         synchronized (this) {
@@ -302,6 +337,14 @@
     public long getWrittenMessages() {
         return writtenMessages;
     }
+    
+    public int getScheduledWriteBytes() {
+        return scheduledWriteBytes.get();
+    }
+    
+    public int getScheduledWriteRequests() {
+        return scheduledWriteRequests.get();
+    }
 
     public void increaseReadBytes(int increment) {
         if (increment > 0) {
@@ -318,15 +361,26 @@
             lastWriteTime = System.currentTimeMillis();
             idleCountForBoth = 0;
             idleCountForWrite = 0;
+            
+            scheduledWriteBytes.addAndGet(-increment);
         }
     }
-
+    
     public void increaseReadMessages() {
         readMessages++;
     }
 
     public void increaseWrittenMessages() {
         writtenMessages++;
+        scheduledWriteRequests.decrementAndGet();
+    }
+    
+    public void increaseScheduledWriteBytes(int increment) {
+        scheduledWriteBytes.addAndGet(increment);
+    }
+
+    public void increaseScheduledWriteRequests() {
+        scheduledWriteRequests.incrementAndGet();
     }
 
     public long getCreationTime() {

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Sep 14 03:04:45 2007
@@ -45,14 +45,22 @@
 
         // SocketIoProcessor.doFlush() will reset it after write is finished
         // because the buffer will be passed with messageSent event. 
-        ((ByteBuffer) writeRequest.getMessage()).mark();
+        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+        buffer.mark();
+        
+        int remaining = buffer.remaining();
+        if (remaining == 0) {
+            s.increaseScheduledWriteRequests();            
+        } else {
+            s.increaseScheduledWriteBytes(buffer.remaining());
+        }
+
         synchronized (writeRequestQueue) {
             writeRequestQueue.push(writeRequest);
-            if (writeRequestQueue.size() == 1
-                    && session.getTrafficMask().isWritable()) {
-                // Notify SocketIoProcessor only when writeRequestQueue was empty.
-                s.getIoProcessor().flush(s);
-            }
+        }
+        
+        if (session.getTrafficMask().isWritable()) {
+            s.getIoProcessor().flush(s);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -101,10 +101,11 @@
     }
 
     void flush(SocketSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = getSelector();
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = getSelector();
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
@@ -122,10 +123,16 @@
         }
     }
 
-    private void scheduleFlush(SocketSessionImpl session) {
-        synchronized (flushingSessions) {
-            flushingSessions.push(session);
+    private boolean scheduleFlush(SocketSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            synchronized (flushingSessions) {
+                flushingSessions.push(session);
+            }
+            
+            return true;
         }
+        
+        return false;
     }
 
     private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -342,6 +349,8 @@
 
             if (session == null)
                 break;
+            
+            session.setScheduledForFlush(false);
 
             if (!session.isConnected()) {
                 releaseWriteBuffers(session);
@@ -362,7 +371,10 @@
             }
 
             try {
-                doFlush(session);
+                boolean flushedAll = doFlush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 scheduleRemove(session);
                 session.getFilterChain().fireExceptionCaught(session, e);
@@ -403,7 +415,7 @@
         }
     }
 
-    private void doFlush(SocketSessionImpl session) throws IOException {
+    private boolean doFlush(SocketSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -444,12 +456,14 @@
                 if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much.
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 }
             }
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void doUpdateTrafficMask() {

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -158,18 +158,6 @@
         return writeRequestQueue;
     }
 
-    public int getScheduledWriteRequests() {
-        synchronized (writeRequestQueue) {
-            return writeRequestQueue.messageSize();
-        }
-    }
-
-    public int getScheduledWriteBytes() {
-        synchronized (writeRequestQueue) {
-            return writeRequestQueue.byteSize();
-        }
-    }
-
     protected void write0(WriteRequest writeRequest) {
         filterChain.fireFilterWrite(this, writeRequest);
     }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Sep 14 03:04:45 2007
@@ -287,19 +287,25 @@
     }
 
     public void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = getSelector();
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = getSelector();
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
     public void closeSession(DatagramSessionImpl session) {
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        synchronized (flushingSessions) {
-            flushingSessions.push(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            synchronized (flushingSessions) {
+                flushingSessions.push(session);
+            }
+            return true;
+        } else {
+            return false;
         }
     }
 
@@ -416,23 +422,28 @@
             if (session == null)
                 break;
 
+            session.setScheduledForFlush(false);
+            
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -476,7 +487,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     // pop and fire event
                     synchronized (writeRequestQueue) {
@@ -491,6 +502,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Sep 14 03:04:45 2007
@@ -229,16 +229,22 @@
     }
 
     public void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = getSelector();
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = getSelector();
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        synchronized (flushingSessions) {
-            flushingSessions.push(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            synchronized (flushingSessions) {
+                flushingSessions.push(session);
+            }
+            return true;
+        } else {
+            return false;
         }
     }
 
@@ -418,24 +424,29 @@
 
             if (session == null)
                 break;
+            
+            session.setScheduledForFlush(false);
 
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -473,7 +484,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
     
@@ -490,6 +501,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -43,14 +43,22 @@
 
         // SocketIoProcessor.doFlush() will reset it after write is finished
         // because the buffer will be passed with messageSent event. 
-        ((ByteBuffer) writeRequest.getMessage()).mark();
+        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+        buffer.mark();
+        
+        int remaining = buffer.remaining();
+        if (remaining == 0) {
+            s.increaseScheduledWriteRequests();            
+        } else {
+            s.increaseScheduledWriteBytes(buffer.remaining());
+        }
+
         synchronized (writeRequestQueue) {
             writeRequestQueue.push(writeRequest);
-            if (writeRequestQueue.size() == 1
-                    && session.getTrafficMask().isWritable()) {
-                // Notify DatagramService only when writeRequestQueue was empty.
-                s.getManagerDelegate().flushSession(s);
-            }
+        }
+        
+        if (session.getTrafficMask().isWritable()) {
+            s.getManagerDelegate().flushSession(s);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -167,18 +167,6 @@
         filterChain.fireFilterWrite(this, writeRequest);
     }
 
-    public int getScheduledWriteRequests() {
-        synchronized (writeRequestQueue) {
-            return writeRequestQueue.messageSize();
-        }
-    }
-
-    public int getScheduledWriteBytes() {
-        synchronized (writeRequestQueue) {
-            return writeRequestQueue.byteSize();
-        }
-    }
-
     public TransportType getTransportType() {
         return TransportType.DATAGRAM;
     }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Fri Sep 14 03:04:45 2007
@@ -89,7 +89,7 @@
 
         DefaultConnectFuture future = new DefaultConnectFuture();
         VmPipeSessionImpl localSession = new VmPipeSessionImpl(this, config,
-                getListeners(), new Object(), // lock
+                getListeners(), // lock
                 new AnonymousSocketAddress(), handler, entry);
 
         // initialize connector session
@@ -127,6 +127,8 @@
             remoteSession.close();
         }
 
+        // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+        // sessionOpened gets received before a messageReceived
         ((VmPipeFilterChain) localSession.getFilterChain()).start();
         ((VmPipeFilterChain) remoteSession.getFilterChain()).start();
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -27,6 +27,7 @@
 
 import edu.emory.mathcs.backport.java.util.Queue;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
@@ -36,20 +37,22 @@
 
     private final Queue eventQueue = new ConcurrentLinkedQueue();
 
-    private boolean flushEnabled;
+    private final AtomicBoolean flushEnabled = new AtomicBoolean();
+    private final AtomicBoolean sessionOpened = new AtomicBoolean();
 
     public VmPipeFilterChain(IoSession session) {
         super(session);
     }
 
     public void start() {
-        flushEnabled = true;
+        flushEnabled.set(true);
         flushEvents();
+        flushPendingDataQueues((VmPipeSessionImpl) getSession());
     }
 
     private void pushEvent(Event e) {
         eventQueue.offer(e);
-        if (flushEnabled) {
+        if (flushEnabled.get()) {
             flushEvents();
         }
     }
@@ -68,12 +71,8 @@
 
         if (type == EventType.RECEIVED) {
             VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            synchronized (s.lock) {
-                if (!s.getTrafficMask().isReadable()) {
-                    synchronized (s.pendingDataQueue) {
-                        s.pendingDataQueue.push(data);
-                    }
-                } else {
+            if (sessionOpened.get() && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+                try {
                     int byteCount = 1;
                     if (data instanceof ByteBuffer) {
                         byteCount = ((ByteBuffer) data).remaining();
@@ -82,7 +81,13 @@
                     s.increaseReadBytes(byteCount);
 
                     super.fireMessageReceived(s, data);
+                } finally {
+                    s.getLock().unlock();
                 }
+                
+                flushPendingDataQueues(s);
+            } else {
+                s.pendingDataQueue.add(data);
             }
         } else if (type == EventType.WRITE) {
             super.fireFilterWrite(session, (WriteRequest) data);
@@ -94,6 +99,7 @@
             super.fireSessionIdle(session, (IdleStatus) data);
         } else if (type == EventType.OPENED) {
             super.fireSessionOpened(session);
+            sessionOpened.set(true);
         } else if (type == EventType.CREATED) {
             super.fireSessionCreated(session);
         } else if (type == EventType.CLOSED) {
@@ -103,6 +109,11 @@
         }
     }
 
+    private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
+        s.updateTrafficMask();
+        s.getRemoteSession().updateTrafficMask();
+    }
+    
     public void fireFilterClose(IoSession session) {
         pushEvent(new Event(EventType.CLOSE, null));
     }
@@ -141,14 +152,9 @@
 
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
-            if (s.isConnected()) {
-
-                if (!s.getTrafficMask().isWritable()) {
-                    synchronized (s.pendingDataQueue) {
-                        s.pendingDataQueue.push(writeRequest);
-                    }
-                } else {
+        if (s.isConnected()) {
+            if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+                try {
                     Object message = writeRequest.getMessage();
 
                     int byteCount = 1;
@@ -164,26 +170,40 @@
                         messageCopy = wb;
                     }
 
+                    // Avoid unwanted side effect that scheduledWrite* becomes negative
+                    // by increasing them.
+                    s.increaseScheduledWriteBytes(byteCount);
+                    s.increaseScheduledWriteRequests();
+
                     s.increaseWrittenBytes(byteCount);
                     s.increaseWrittenMessages();
 
                     s.getRemoteSession().getFilterChain().fireMessageReceived(
                             s.getRemoteSession(), messageCopy);
                     s.getFilterChain().fireMessageSent(s, writeRequest);
+                } finally {
+                    s.getLock().unlock();
                 }
+                
+                flushPendingDataQueues( s );
             } else {
-                writeRequest.getFuture().setWritten(false);
+                s.pendingDataQueue.add(writeRequest);
             }
+        } else {
+            writeRequest.getFuture().setWritten(false);
         }
     }
 
     protected void doClose(IoSession session) {
         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-        synchronized (s.lock) {
+        s.getLock().lock();
+        try {
             if (!session.getCloseFuture().isClosed()) {
                 s.getServiceListeners().fireSessionDestroyed(s);
                 s.getRemoteSession().close();
             }
+        } finally {
+            s.getLock().unlock();
         }
     }
 
@@ -223,7 +243,7 @@
 
         private final Object data;
 
-        public Event(EventType type, Object data) {
+        private Event(EventType type, Object data) {
             this.type = type;
             this.data = data;
         }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -34,6 +34,9 @@
 import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.util.Queue;
 
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+
 /**
  * A {@link IoSession} for in-VM transport (VM_PIPE).
  * 
@@ -62,7 +65,7 @@
 
     private final VmPipeSessionImpl remoteSession;
 
-    final Object lock;
+    private final Lock lock;
 
     final Queue pendingDataQueue;
 
@@ -70,12 +73,12 @@
      * Constructor for client-side session.
      */
     public VmPipeSessionImpl(IoService service, IoServiceConfig serviceConfig,
-            IoServiceListenerSupport serviceListeners, Object lock,
+            IoServiceListenerSupport serviceListeners,
             SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
         this.service = service;
         this.serviceConfig = serviceConfig;
         this.serviceListeners = serviceListeners;
-        this.lock = lock;
+        this.lock = new ReentrantLock();
         this.localAddress = localAddress;
         this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
         this.handler = handler;
@@ -137,14 +140,6 @@
         this.filterChain.fireFilterWrite(this, writeRequest);
     }
 
-    public int getScheduledWriteRequests() {
-        return 0;
-    }
-
-    public int getScheduledWriteBytes() {
-        return 0;
-    }
-
     public TransportType getTransportType() {
         return TransportType.VM_PIPE;
     }
@@ -184,5 +179,9 @@
                 }
             }
         }
+    }
+    
+    Lock getLock() {
+        return lock;
     }
 }

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java Fri Sep 14 03:04:45 2007
@@ -25,9 +25,6 @@
 import java.util.List;
 import java.util.NoSuchElementException;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilter.WriteRequest;
-
 /**
  * A unbounded circular queue.
  * 
@@ -153,116 +150,6 @@
         return size;
     }
     
-    /**
-     * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
-     * in this queue.
-     */
-    public int byteSize() {
-        if (isEmpty()) {
-            return 0;
-        }
-
-        int byteSize = 0;
-
-        if (first < last) {
-            for (int i = first; i < last; i++) {
-                if (items[i] instanceof ByteBuffer) {
-                    byteSize += ((ByteBuffer) items[i]).remaining();
-                } else if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        byteSize += ((ByteBuffer) message).remaining();
-                    }
-                }
-            }
-        } else {
-            for (int i = first; i < items.length; i++) {
-                if (items[i] instanceof ByteBuffer) {
-                    byteSize += ((ByteBuffer) items[i]).remaining();
-                } else if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        byteSize += ((ByteBuffer) message).remaining();
-                    }
-                }
-            }
-            for (int i = last - 1; i >= 0; i--) {
-                if (items[i] instanceof ByteBuffer) {
-                    byteSize += ((ByteBuffer) items[i]).remaining();
-                } else if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        byteSize += ((ByteBuffer) message).remaining();
-                    }
-                }
-            }
-        }
-
-        return byteSize;
-    }
-    
-    public int messageSize() {
-        if (isEmpty()) {
-            return 0;
-        }
-
-        int messageSize = 0;
-
-        if (first < last) {
-            for (int i = first; i < last; i++) {
-                if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        if (((ByteBuffer) message).hasRemaining()) {
-                            messageSize ++;
-                        }
-                    } else {
-                        messageSize ++;
-                    }
-                } else if (items[i] instanceof ByteBuffer) {
-                    if (((ByteBuffer) items[i]).hasRemaining()) {
-                        messageSize ++;
-                    }
-                }
-            }
-        } else {
-            for (int i = first; i < items.length; i++) {
-                if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        if (((ByteBuffer) message).hasRemaining()) {
-                            messageSize ++;
-                        }
-                    } else {
-                        messageSize ++;
-                    }
-                } else if (items[i] instanceof ByteBuffer) {
-                    if (((ByteBuffer) items[i]).hasRemaining()) {
-                        messageSize ++;
-                    }
-                }
-            }
-            for (int i = last - 1; i >= 0; i--) {
-                if (items[i] instanceof WriteRequest) {
-                    Object message = ((WriteRequest) items[i]).getMessage();
-                    if (message instanceof ByteBuffer) {
-                        if (((ByteBuffer) message).hasRemaining()) {
-                            messageSize ++;
-                        }
-                    } else {
-                        messageSize ++;
-                    }
-                } else if (items[i] instanceof ByteBuffer) {
-                    if (((ByteBuffer) items[i]).hasRemaining()) {
-                        messageSize ++;
-                    }
-                }
-            }
-        }
-
-        return messageSize;
-    }
-
     public String toString() {
         return "first=" + first + ", last=" + last + ", size=" + size
                 + ", mask = " + mask;

Modified: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java (original)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java Fri Sep 14 03:04:45 2007
@@ -75,7 +75,7 @@
         future.join();
         IoSession session = future.getSession();
 
-        // We wait for the sessionCreated() event is fired becayse we cannot guarentee that
+        // We wait for the sessionCreated() event is fired because we cannot guarantee that
         // it is invoked already.
         while (session.getAttribute("lock") == null) {
             Thread.yield();

Added: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?rev=575603&view=auto
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (added)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Fri Sep 14 03:04:45 2007
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author Apache Mina Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeSessionCrossCommunicationTest extends TestCase {
+    public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
+        final VmPipeAddress address = new VmPipeAddress( 1 );
+        final IoConnector connector = new VmPipeConnector();
+        final AtomicReference c1 = new AtomicReference();
+        final CountDownLatch latch = new CountDownLatch( 1 );
+        final CountDownLatch messageCount = new CountDownLatch( 2 );
+        IoAcceptor acceptor = new VmPipeAcceptor();
+
+        acceptor.bind( address, new IoHandlerAdapter() {
+            public void messageReceived( IoSession session, Object message ) throws Exception {
+                System.out.println( Thread.currentThread().getName() + ": " + message );
+
+                if ( "start".equals( message ) ) {
+                    session.write( "open new" );
+                } else if ( "re-use c1".equals( message ) ) {
+                    session.write( "tell me something on c1 now" );
+                } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+                    messageCount.countDown();
+                } else {
+                    fail( "unexpected message received " + message );
+                }
+            }
+        } );
+
+        connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+
+        ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+            public void messageReceived( IoSession session, Object message ) throws Exception {
+                System.out.println( Thread.currentThread().getName() + ": " + message );
+                
+                if ( "open new".equals( message ) ) {
+                    System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+
+                    ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+                        public void sessionOpened( IoSession session ) throws Exception {
+                            session.write( "re-use c1" );
+                        }
+
+                        public void messageReceived( IoSession session, Object message ) throws Exception {
+                            System.out.println( Thread.currentThread().getName() + ": " + message );
+
+                            if ( "tell me something on c1 now".equals( message ) ) {
+                                latch.countDown();
+                                ((IoSession) c1.get()).write( "please don't deadlock via c1" );
+                            } else {
+                                fail( "unexpected message received " + message );
+                            }
+                        }
+                    } );
+
+                    c2Future.join();
+
+                    latch.await();
+
+                    c2Future.getSession().write( "please don't deadlock via c2" );
+                } else {
+                    fail( "unexpeced message received " + message );
+                }
+            }
+        } );
+
+        future.join();
+
+        c1.set( future.getSession() );
+        ((IoSession) c1.get()).write( "start" );
+
+        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+        while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+            long[] threads = threadMXBean.findMonitorDeadlockedThreads();
+
+            if ( null != threads ) {
+                StringBuffer sb = new StringBuffer( 256 );
+                ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
+
+                for (int i = 0; i < infos.length; i ++) {
+                    ThreadInfo info = infos[i];
+                    sb.append( info.getThreadName() )
+                        .append( " blocked on " )
+                        .append( info.getLockName() )
+                        .append( " owned by " )
+                        .append( info.getLockOwnerName() )
+                        .append( "\n" );
+                }
+
+                for (int i = 0; i < infos.length; i ++) {
+                    ThreadInfo info = infos[i];
+                    sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
+                    StackTraceElement[] stackTrace = info.getStackTrace();
+                    for (int j = 0; j < stackTrace.length; j ++) {
+                        sb.append( "\t" ).append( stackTrace[j] ).append( "\n" );
+                    }
+                }
+
+                fail( "deadlocked! \n" + sb );
+            }
+        }
+
+        ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
+        acceptor.unbindAll();
+    }
+}

Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Sep 14 03:04:45 2007
@@ -25,9 +25,13 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
 import org.apache.mina.common.IoService;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.TrafficMask;
@@ -41,6 +45,16 @@
  * @version $Rev$, $Date$
  */
 public abstract class BaseIoSession implements IoSession {
+    
+    private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+        new IoFutureListener() {
+            public void operationComplete(IoFuture future) {
+                BaseIoSession s = (BaseIoSession) future.getSession();
+                s.scheduledWriteBytes.set(0);
+                s.scheduledWriteRequests.set(0);
+            }
+        };
+    
     private final Object lock = new Object();
 
     private final Map<String, Object> attributes = Collections
@@ -52,6 +66,12 @@
      * A future that will be set 'closed' when the connection is closed.
      */
     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
+    
+    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+    
+    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
+    private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
 
     private volatile boolean closing;
 
@@ -94,6 +114,7 @@
     protected BaseIoSession() {
         creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
                 .currentTimeMillis();
+        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
     }
 
     public boolean isConnected() {
@@ -107,6 +128,19 @@
     public CloseFuture getCloseFuture() {
         return closeFuture;
     }
+    
+    public boolean isScheduledForFlush() {
+        return scheduledForFlush.get();
+    }
+    
+    public boolean setScheduledForFlush(boolean flag) {
+        if (flag) {
+            return scheduledForFlush.compareAndSet(false, true);
+        } else {
+            scheduledForFlush.set(false);
+            return true;
+        }
+    }
 
     public CloseFuture close() {
         synchronized (lock) {
@@ -300,6 +334,14 @@
     public long getWrittenMessages() {
         return writtenMessages;
     }
+    
+    public int getScheduledWriteBytes() {
+        return scheduledWriteBytes.get();
+    }
+    
+    public int getScheduledWriteRequests() {
+        return scheduledWriteRequests.get();
+    }
 
     public void increaseReadBytes(int increment) {
         if (increment > 0) {
@@ -316,15 +358,26 @@
             lastWriteTime = System.currentTimeMillis();
             idleCountForBoth = 0;
             idleCountForWrite = 0;
+            
+            scheduledWriteBytes.addAndGet(-increment);
         }
     }
-
+    
     public void increaseReadMessages() {
         readMessages++;
     }
 
     public void increaseWrittenMessages() {
         writtenMessages++;
+        scheduledWriteRequests.decrementAndGet();
+    }
+    
+    public void increaseScheduledWriteBytes(int increment) {
+        scheduledWriteBytes.addAndGet(increment);
+    }
+
+    public void increaseScheduledWriteRequests() {
+        scheduledWriteRequests.incrementAndGet();
     }
 
     public long getCreationTime() {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Sep 14 03:04:45 2007
@@ -49,7 +49,13 @@
         ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
         buffer.mark();
 
-        s.getScheduledWriteBytesCounter().addAndGet(buffer.remaining());
+        int remaining = buffer.remaining();
+        if (remaining == 0) {
+            s.increaseScheduledWriteRequests();            
+        } else {
+            s.increaseScheduledWriteBytes(buffer.remaining());
+        }
+
         writeRequestQueue.add(writeRequest);
 
         if (session.getTrafficMask().isWritable()) {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -110,7 +110,7 @@
     }
 
     private boolean scheduleFlush(SocketSessionImpl session) {
-        if ( session.getInFlushQueue().compareAndSet( false, true ) ) {
+        if (session.setScheduledForFlush(true)) {
             flushingSessions.add(session);
 
             return true;
@@ -306,7 +306,7 @@
             if (session == null)
                 break;
 
-            session.getInFlushQueue().set( false );
+            session.setScheduledForFlush(false);
 
             if (!session.isConnected()) {
                 releaseWriteBuffers(session);
@@ -328,7 +328,7 @@
 
             try {
                 boolean flushedAll = doFlush(session);
-                if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.getInFlushQueue().get()) {
+                if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
                     scheduleFlush( session );
                 }
             } catch (IOException e) {
@@ -342,11 +342,10 @@
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        while ((req = writeRequestQueue.poll()) != null) {
+        if ((req = writeRequestQueue.poll()) != null) {
             ByteBuffer buf = (ByteBuffer) req.getMessage();
             try {
                 buf.release();
-                session.getScheduledWriteBytesCounter().addAndGet( -buf.remaining() );
             } catch (IllegalStateException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             } finally {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -25,10 +25,7 @@
 import java.nio.channels.SocketChannel;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -37,7 +34,7 @@
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportType;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.common.support.BaseIoSessionConfig;
 import org.apache.mina.common.support.IoServiceListenerSupport;
@@ -73,10 +70,6 @@
 
     private final IoServiceListenerSupport serviceListeners;
 
-    private final AtomicBoolean inFlushQueue = new AtomicBoolean( false );
-
-    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
-
     private SelectionKey key;
 
     private int readBufferSize = 1024;
@@ -167,43 +160,6 @@
         return writeRequestQueue;
     }
 
-    public int getScheduledWriteRequests() {
-        int size = 0;
-        for (WriteRequest request : writeRequestQueue) {
-            Object message = request.getMessage();
-            if (message instanceof ByteBuffer) {
-                if (((ByteBuffer) message).hasRemaining()) {
-                    size ++;
-                }
-            } else {
-                size ++;
-            }
-        }
-
-        return size;
-    }
-
-    /**
-     * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
-     * in the writeRequestQueue queue.
-     *
-     * @throws ClassCastException if an element is not a {@link ByteBuffer}
-     */
-    public int getScheduledWriteBytes() {
-        return scheduledWriteBytes.get();
-    }
-
-    @Override
-    public void increaseWrittenBytes( int increment ) {
-        super.increaseWrittenBytes( increment );
-
-        scheduledWriteBytes.addAndGet( -increment );
-    }
-
-    AtomicInteger getScheduledWriteBytesCounter() {
-        return scheduledWriteBytes;
-    }
-
     @Override
     protected void write0(WriteRequest writeRequest) {
         filterChain.fireFilterWrite(this, writeRequest);
@@ -236,10 +192,6 @@
 
     void setReadBufferSize(int readBufferSize) {
         this.readBufferSize = readBufferSize;
-    }
-
-    AtomicBoolean getInFlushQueue() {
-        return inFlushQueue;
     }
 
     private class SessionConfigImpl extends BaseIoSessionConfig implements

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Sep 14 03:04:45 2007
@@ -286,18 +286,24 @@
     }
 
     public void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = this.selector;
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
     public void closeSession(DatagramSessionImpl session) {
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        flushingSessions.add(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     private class Worker implements Runnable {
@@ -409,23 +415,28 @@
             if (session == null)
                 break;
 
+            session.setScheduledForFlush(false);
+
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -463,7 +474,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     // pop and fire event
                     writeRequestQueue.poll();
@@ -476,6 +487,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Sep 14 03:04:45 2007
@@ -225,15 +225,21 @@
     }
 
     public void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = this.selector;
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        flushingSessions.add(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     public void updateTrafficMask(DatagramSessionImpl session) {
@@ -400,23 +406,28 @@
             if (session == null)
                 break;
 
+            session.setScheduledForFlush(false);
+
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -449,7 +460,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
     
@@ -464,6 +475,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -45,14 +45,20 @@
 
         // SocketIoProcessor.doFlush() will reset it after write is finished
         // because the buffer will be passed with messageSent event.
-        ((ByteBuffer) writeRequest.getMessage()).mark();
-        synchronized (writeRequestQueue) {
-            writeRequestQueue.add(writeRequest);
-            if (writeRequestQueue.size() == 1
-                    && session.getTrafficMask().isWritable()) {
-                // Notify DatagramService only when writeRequestQueue was empty.
-                s.getManagerDelegate().flushSession(s);
-            }
+        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+        buffer.mark();
+        
+        int remaining = buffer.remaining();
+        if (remaining == 0) {
+            s.increaseScheduledWriteRequests();            
+        } else {
+            s.increaseScheduledWriteBytes(buffer.remaining());
+        }
+
+        writeRequestQueue.add(writeRequest);
+        
+        if (session.getTrafficMask().isWritable()) {
+            s.getManagerDelegate().flushSession(s);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -27,8 +27,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.mina.common.BroadcastIoSession;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -38,6 +36,7 @@
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportType;
 import org.apache.mina.common.WriteFuture;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
 import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
@@ -170,34 +169,6 @@
     @Override
     protected void write0(WriteRequest writeRequest) {
         filterChain.fireFilterWrite(this, writeRequest);
-    }
-
-    public int getScheduledWriteRequests() {
-        int size = 0;
-        synchronized (writeRequestQueue) {
-            for (WriteRequest request : writeRequestQueue) {
-                Object message = request.getMessage();
-                if (message instanceof ByteBuffer) {
-                    if (((ByteBuffer) message).hasRemaining()) {
-                        size ++;
-                    }
-                } else {
-                    size ++;
-                }
-            }
-        }
-
-        return size;
-    }
-
-    public int getScheduledWriteBytes() {
-        int byteSize = 0;
-
-        for (WriteRequest request : writeRequestQueue) {
-            byteSize += ((ByteBuffer) request.getMessage()).remaining();
-        }
-
-        return byteSize;
     }
 
     public TransportType getTransportType() {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -180,6 +180,11 @@
                         messageCopy = wb;
                     }
 
+                    // Avoid unwanted side effect that scheduledWrite* becomes negative
+                    // by increasing them.
+                    s.increaseScheduledWriteBytes(byteCount);
+                    s.increaseScheduledWriteRequests();
+                    
                     s.increaseWrittenBytes(byteCount);
                     s.increaseWrittenMessages();
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -144,14 +144,6 @@
         this.filterChain.fireFilterWrite(this, writeRequest);
     }
 
-    public int getScheduledWriteRequests() {
-        return 0;
-    }
-
-    public int getScheduledWriteBytes() {
-        return 0;
-    }
-
     public TransportType getTransportType() {
         return TransportType.VM_PIPE;
     }

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Sep 14 03:04:45 2007
@@ -29,6 +29,9 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -38,6 +41,16 @@
  * @version $Rev$, $Date$
  */
 public abstract class AbstractIoSession implements IoSession {
+
+    private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+        new IoFutureListener() {
+            public void operationComplete(IoFuture future) {
+                AbstractIoSession s = (AbstractIoSession) future.getSession();
+                s.scheduledWriteBytes.set(0);
+                s.scheduledWriteMessages.set(0);
+            }
+        };
+    
     private final Object lock = new Object();
 
     private final Map<String, Object> attributes = Collections
@@ -50,6 +63,12 @@
      */
     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
 
+    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+    
+    private final AtomicLong scheduledWriteBytes = new AtomicLong();
+
+    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
+
     private volatile boolean closing;
 
     private TrafficMask trafficMask = TrafficMask.ALL;
@@ -82,6 +101,7 @@
     protected AbstractIoSession() {
         creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
                 .currentTimeMillis();
+        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
     }
 
     public boolean isConnected() {
@@ -96,6 +116,19 @@
         return closeFuture;
     }
 
+    public boolean isScheduledForFlush() {
+        return scheduledForFlush.get();
+    }
+    
+    public boolean setScheduledForFlush(boolean flag) {
+        if (flag) {
+            return scheduledForFlush.compareAndSet(false, true);
+        } else {
+            scheduledForFlush.set(false);
+            return true;
+        }
+    }
+
     public CloseFuture close() {
         synchronized (lock) {
             if (isClosing()) {
@@ -360,6 +393,14 @@
     public long getWrittenMessages() {
         return writtenMessages;
     }
+    
+    public long getScheduledWriteBytes() {
+        return scheduledWriteBytes.get();
+    }
+    
+    public int getScheduledWriteMessages() {
+        return scheduledWriteMessages.get();
+    }
 
     public void increaseReadBytes(int increment) {
         if (increment > 0) {
@@ -376,6 +417,8 @@
             lastWriteTime = System.currentTimeMillis();
             idleCountForBoth = 0;
             idleCountForWrite = 0;
+            
+            scheduledWriteBytes.addAndGet(-increment);
         }
     }
 
@@ -385,6 +428,15 @@
 
     public void increaseWrittenMessages() {
         writtenMessages++;
+        scheduledWriteMessages.decrementAndGet();
+    }
+
+    public void increaseScheduledWriteBytes(int increment) {
+        scheduledWriteBytes.addAndGet(increment);
+    }
+
+    public void increaseScheduledWriteMessages() {
+        scheduledWriteMessages.incrementAndGet();
     }
 
     public long getCreationTime() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Fri Sep 14 03:04:45 2007
@@ -262,15 +262,21 @@
     }
 
     void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = this.selector;
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        flushingSessions.add(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     private class Worker implements Runnable {
@@ -361,24 +367,29 @@
             if (session == null) {
                 break;
             }
+            
+            session.setScheduledForFlush(false);
 
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -421,7 +432,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -439,6 +450,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Fri Sep 14 03:04:45 2007
@@ -170,15 +170,21 @@
     }
 
     void flushSession(DatagramSessionImpl session) {
-        scheduleFlush(session);
-        Selector selector = this.selector;
-        if (selector != null) {
-            selector.wakeup();
+        if (scheduleFlush(session)) {
+            Selector selector = this.selector;
+            if (selector != null) {
+                selector.wakeup();
+            }
         }
     }
 
-    private void scheduleFlush(DatagramSessionImpl session) {
-        flushingSessions.add(session);
+    private boolean scheduleFlush(DatagramSessionImpl session) {
+        if (session.setScheduledForFlush(true)) {
+            flushingSessions.add(session);
+            return true;
+        } else {
+            return false;
+        }
     }
 
     void updateTrafficMask(DatagramSessionImpl session) {
@@ -315,24 +321,29 @@
             if (session == null) {
                 break;
             }
+            
+            session.setScheduledForFlush(false);
 
             try {
-                flush(session);
+                boolean flushedAll = flush(session);
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+                    scheduleFlush(session);
+                }
             } catch (IOException e) {
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
-    private void flush(DatagramSessionImpl session) throws IOException {
+    private boolean flush(DatagramSessionImpl session) throws IOException {
         // Clear OP_WRITE
         SelectionKey key = session.getSelectionKey();
         if (key == null) {
             scheduleFlush(session);
-            return;
+            return false;
         }
         if (!key.isValid()) {
-            return;
+            return false;
         }
         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -369,7 +380,7 @@
                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
                     // Kernel buffer is full or wrote too much
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
+                    return false;
                 } else {
                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
@@ -387,6 +398,8 @@
         } finally {
             session.increaseWrittenBytes(writtenBytes);
         }
+        
+        return true;
     }
 
     private void registerNew() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -43,16 +43,22 @@
     protected void doWrite(IoSession session, WriteRequest writeRequest) {
         DatagramSessionImpl s = (DatagramSessionImpl) session;
         Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
-        ((ByteBuffer) writeRequest.getMessage()).mark();
 
-        int writeRequestQueueSize;
+        ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+        buffer.mark();
+        
+        int remaining = buffer.remaining();
+        if (remaining == 0) {
+            s.increaseScheduledWriteMessages();            
+        } else {
+            s.increaseScheduledWriteBytes(buffer.remaining());
+        }
+
         synchronized (writeRequestQueue) {
             writeRequestQueue.add(writeRequest);
-            writeRequestQueueSize = writeRequestQueue.size();
         }
-
-        if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
-            // Notify SocketIoProcessor only when writeRequestQueue was empty.
+        
+        if (session.getTrafficMask().isWritable()) {
             IoService service = s.getService();
             if (service instanceof DatagramAcceptor) {
                 ((DatagramAcceptor) service).flushSession(s);

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -170,37 +170,6 @@
         filterChain.fireFilterWrite(this, writeRequest);
     }
 
-    public int getScheduledWriteMessages() {
-        int size = 0;
-        synchronized (writeRequestQueue) {
-            for (WriteRequest request : writeRequestQueue) {
-                Object message = request.getMessage();
-                if (message instanceof ByteBuffer) {
-                    if (((ByteBuffer) message).hasRemaining()) {
-                        size++;
-                    }
-                } else {
-                    size++;
-                }
-            }
-        }
-
-        return size;
-    }
-
-    public long getScheduledWriteBytes() {
-        int size = 0;
-        synchronized (writeRequestQueue) {
-            for (Object o : writeRequestQueue) {
-                if (o instanceof ByteBuffer) {
-                    size += ((ByteBuffer) o).remaining();
-                }
-            }
-        }
-
-        return size;
-    }
-
     public InetSocketAddress getRemoteAddress() {
         return remoteAddress;
     }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -125,7 +125,7 @@
     }
 
     private boolean scheduleFlush(SocketSessionImpl session) {
-        if (session.getInFlushQueue().compareAndSet(false, true)) {
+        if (session.setScheduledForFlush(true)) {
             flushingSessions.add(session);
 
             return true;
@@ -334,7 +334,7 @@
                 break;
             }
 
-            session.getInFlushQueue().set(false);
+            session.setScheduledForFlush(false);
 
             if (!session.isConnected()) {
                 clearWriteRequestQueue(session);
@@ -356,7 +356,7 @@
 
             try {
                 boolean flushedAll = doFlush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.getInFlushQueue().get()) {
+                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
                     scheduleFlush(session);
                 }
             } catch (IOException e) {
@@ -370,12 +370,10 @@
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
 
-        while ((req = writeRequestQueue.poll()) != null) {
+        if ((req = writeRequestQueue.poll()) != null) {
             Object m = req.getMessage();
             if (m instanceof ByteBuffer) {
                 ByteBuffer buf = (ByteBuffer) req.getMessage();
-
-                session.getScheduledWriteBytesCounter().addAndGet(-buf.remaining());
 
                 // The first unwritten empty buffer must be
                 // forwarded to the filter chain.

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -25,8 +25,6 @@
 import java.nio.channels.SocketChannel;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
@@ -69,10 +67,6 @@
 
     private final IoHandler handler;
 
-    private final AtomicBoolean inFlushQueue = new AtomicBoolean(false);
-
-    private final AtomicLong scheduledWriteBytes = new AtomicLong();
-
     private SelectionKey key;
 
     private int readBufferSize = 1024;
@@ -149,21 +143,6 @@
         return size;
     }
 
-    public long getScheduledWriteBytes() {
-        return scheduledWriteBytes.get();
-    }
-
-    @Override
-    public void increaseWrittenBytes(long increment) {
-        super.increaseWrittenBytes(increment);
-
-        scheduledWriteBytes.addAndGet(-increment);
-    }
-
-    AtomicLong getScheduledWriteBytesCounter() {
-        return scheduledWriteBytes;
-    }
-
     @Override
     protected void write0(WriteRequest writeRequest) {
         filterChain.fireFilterWrite(this, writeRequest);
@@ -195,17 +174,18 @@
         this.readBufferSize = readBufferSize;
     }
 
-    AtomicBoolean getInFlushQueue() {
-        return inFlushQueue;
-    }
-    
     void queueWriteRequest(WriteRequest writeRequest) {
         if (writeRequest.getMessage() instanceof ByteBuffer) {
             ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
             // SocketIoProcessor.doFlush() will reset it after write is finished
             // because the buffer will be passed with messageSent event.
             buffer.mark();
-            scheduledWriteBytes.addAndGet(buffer.remaining());
+            int remaining = buffer.remaining();
+            if (remaining == 0) {
+                increaseScheduledWriteMessages();            
+            } else {
+                increaseScheduledWriteBytes(buffer.remaining());
+            }
         }
 
         writeRequestQueue.add(writeRequest);

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -182,6 +182,11 @@
                         messageCopy = wb;
                     }
 
+                    // Avoid unwanted side effect that scheduledWrite* becomes negative
+                    // by increasing them.
+                    s.increaseScheduledWriteBytes(byteCount);
+                    s.increaseScheduledWriteMessages();
+                    
                     s.increaseWrittenBytes(byteCount);
                     s.increaseWrittenMessages();
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -144,14 +144,6 @@
         this.filterChain.fireFilterWrite(this, writeRequest);
     }
 
-    public int getScheduledWriteMessages() {
-        return 0;
-    }
-
-    public long getScheduledWriteBytes() {
-        return 0;
-    }
-
     public VmPipeAddress getRemoteAddress() {
         return remoteAddress;
     }