You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/08/18 06:39:48 UTC

svn commit: r567228 - in /mina: branches/1.0/core/src/main/java/org/apache/mina/common/support/ branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/ branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/ branches...

Author: trustin
Date: Fri Aug 17 21:39:46 2007
New Revision: 567228

URL: http://svn.apache.org/viewvc?view=rev&rev=567228
Log:
Fixed issue: DIRMINA-305 (SocketIoProcessor is biased to write operations)
* Restricted the flush loop to write not more than twice of the socket send buffer size


Modified:
    mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
    mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Aug 17 21:39:46 2007
@@ -304,17 +304,21 @@
     }
 
     public void increaseReadBytes(int increment) {
-        readBytes += increment;
-        lastReadTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForRead = 0;
+        if (increment > 0) {
+            readBytes += increment;
+            lastReadTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForRead = 0;
+        }
     }
 
     public void increaseWrittenBytes(int increment) {
-        writtenBytes += increment;
-        lastWriteTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForWrite = 0;
+        if (increment > 0) {
+            writtenBytes += increment;
+            lastWriteTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForWrite = 0;
+        }
     }
 
     public void increaseReadMessages() {

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -405,41 +405,44 @@
         SocketChannel ch = session.getChannel();
         Queue writeRequestQueue = session.getWriteRequestQueue();
 
-        for (;;) {
-            WriteRequest req;
-
-            synchronized (writeRequestQueue) {
-                req = (WriteRequest) writeRequestQueue.first();
-            }
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req;
+    
                 synchronized (writeRequestQueue) {
-                    writeRequestQueue.pop();
+                    req = (WriteRequest) writeRequestQueue.first();
                 }
-
-                session.increaseWrittenMessages();
-
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            if (key.isWritable()) {
-                int writtenBytes = ch.write(buf.buf());
-                if (writtenBytes > 0) {
-                    session.increaseWrittenBytes(writtenBytes);
+    
+                if (req == null)
+                    break;
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.pop();
+                    }
+    
+                    session.increaseWrittenMessages();
+    
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+    
+                if (key.isWritable()) {
+                    writtenBytes += ch.write(buf.buf());
+                }
+    
+                if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much.
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
                 }
             }
-
-            if (buf.hasRemaining()) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                break;
-            }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Aug 17 21:39:46 2007
@@ -425,65 +425,71 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for (;;) {
-            synchronized (writeRequestQueue) {
-                req = (WriteRequest) writeRequestQueue.first();
-            }
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req;
                 synchronized (writeRequestQueue) {
-                    writeRequestQueue.pop();
+                    req = (WriteRequest) writeRequestQueue.first();
                 }
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                ((DatagramFilterChain) session.getFilterChain())
-                        .fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            SocketAddress destination = req.getDestination();
-            if (destination == null) {
-                destination = session.getRemoteAddress();
-            }
-
-            int writtenBytes = ch.send(buf.buf(), destination);
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                synchronized (writeRequestQueue) {
-                    writeRequestQueue.pop();
+    
+                if (req == null)
+                    break;
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.pop();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    ((DatagramFilterChain) session.getFilterChain())
+                            .fireMessageSent(session, req);
+                    continue;
+                }
+    
+                SocketAddress destination = req.getDestination();
+                if (destination == null) {
+                    destination = session.getRemoteAddress();
+                }
+    
+                int localWrittenBytes = ch.send(buf.buf(), destination);
+                writtenBytes += localWrittenBytes;
+    
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.pop();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Aug 17 21:39:46 2007
@@ -428,59 +428,67 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for (;;) {
-            synchronized (writeRequestQueue) {
-                req = (WriteRequest) writeRequestQueue.first();
-            }
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req;
                 synchronized (writeRequestQueue) {
-                    writeRequestQueue.pop();
+                    req = (WriteRequest) writeRequestQueue.first();
                 }
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            int writtenBytes = ch.write(buf.buf());
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                synchronized (writeRequestQueue) {
-                    writeRequestQueue.pop();
+    
+                if (req == null)
+                    break;
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.pop();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+    
+                int localWrittenBytes = ch.write(buf.buf());
+                writtenBytes += localWrittenBytes;
+    
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+    
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.pop();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Aug 17 21:39:46 2007
@@ -304,17 +304,21 @@
     }
 
     public void increaseReadBytes(int increment) {
-        readBytes += increment;
-        lastReadTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForRead = 0;
+        if (increment > 0) {
+            readBytes += increment;
+            lastReadTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForRead = 0;
+        }
     }
 
     public void increaseWrittenBytes(int increment) {
-        writtenBytes += increment;
-        lastWriteTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForWrite = 0;
+        if (increment > 0) {
+            writtenBytes += increment;
+            lastWriteTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForWrite = 0;
+        }
     }
 
     public void increaseReadMessages() {

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -361,35 +361,38 @@
         SocketChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        for (;;) {
-            WriteRequest req = writeRequestQueue.peek();
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                writeRequestQueue.poll();
-
-                session.increaseWrittenMessages();
-
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            if (key.isWritable()) {
-                int writtenBytes = ch.write(buf.buf());
-                if (writtenBytes > 0) {
-                    session.increaseWrittenBytes(writtenBytes);
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req = writeRequestQueue.peek();
+    
+                if (req == null)
+                    break;
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    writeRequestQueue.poll();
+    
+                    session.increaseWrittenMessages();
+    
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+    
+                if (key.isWritable()) {
+                    writtenBytes += ch.write(buf.buf());
+                }
+    
+                if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much.
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
                 }
             }
-
-            if (buf.hasRemaining()) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                break;
-            }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Aug 17 21:39:46 2007
@@ -39,12 +39,12 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionRecycler;
 import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoAcceptor;
 import org.apache.mina.common.support.IoServiceListenerSupport;
 import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
@@ -418,57 +418,63 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        for (;;) {
-            WriteRequest req = writeRequestQueue.peek();
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
-                writeRequestQueue.poll();
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            SocketAddress destination = req.getDestination();
-            if (destination == null) {
-                destination = session.getRemoteAddress();
-            }
-
-            int writtenBytes = ch.send(buf.buf(), destination);
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                writeRequestQueue.poll();
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req = writeRequestQueue.peek();
+    
+                if (req == null)
+                    break;
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    writeRequestQueue.poll();
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+    
+                SocketAddress destination = req.getDestination();
+                if (destination == null) {
+                    destination = session.getRemoteAddress();
+                }
+    
+                int localWrittenBytes = ch.send(buf.buf(), destination);
+                writtenBytes += localWrittenBytes;
+    
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    // pop and fire event
+                    writeRequestQueue.poll();
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                }
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Aug 17 21:39:46 2007
@@ -409,52 +409,60 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        for (;;) {
-            WriteRequest req = writeRequestQueue.peek();
-
-            if (req == null)
-                break;
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
-                writeRequestQueue.poll();
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            int writtenBytes = ch.write(buf.buf());
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                writeRequestQueue.poll();
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
+        int writtenBytes = 0;
+        int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req = writeRequestQueue.peek();
+        
+                if (req == null)
+                    break;
+        
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    writeRequestQueue.poll();
+        
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+        
+                int localWrittenBytes = ch.write(buf.buf());
+                writtenBytes += localWrittenBytes;
+    
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+    
+                    // pop and fire event
+                    writeRequestQueue.poll();
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                }
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Aug 17 21:39:46 2007
@@ -364,17 +364,21 @@
     }
 
     public void increaseReadBytes(int increment) {
-        readBytes += increment;
-        lastReadTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForRead = 0;
+        if (increment > 0) {
+            readBytes += increment;
+            lastReadTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForRead = 0;
+        }
     }
 
     public void increaseWrittenBytes(long increment) {
-        writtenBytes += increment;
-        lastWriteTime = System.currentTimeMillis();
-        idleCountForBoth = 0;
-        idleCountForWrite = 0;
+        if (increment > 0) {
+            writtenBytes += increment;
+            lastWriteTime = System.currentTimeMillis();
+            idleCountForBoth = 0;
+            idleCountForWrite = 0;
+        }
     }
 
     public void increaseReadMessages() {

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Fri Aug 17 21:39:46 2007
@@ -371,66 +371,73 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for (;;) {
-            synchronized (writeRequestQueue) {
-                req = writeRequestQueue.peek();
-            }
-
-            if (req == null) {
-                break;
-            }
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
+        int writtenBytes = 0;
+        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req;
                 synchronized (writeRequestQueue) {
-                    writeRequestQueue.poll();
+                    req = writeRequestQueue.peek();
                 }
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                ((DatagramFilterChain) session.getFilterChain())
-                        .fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            SocketAddress destination = req.getDestination();
-            if (destination == null) {
-                destination = session.getRemoteAddress();
-            }
-
-            int writtenBytes = ch.send(buf.buf(), destination);
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                synchronized (writeRequestQueue) {
-                    writeRequestQueue.poll();
+    
+                if (req == null) {
+                    break;
+                }
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    ((DatagramFilterChain) session.getFilterChain())
+                            .fireMessageSent(session, req);
+                    continue;
+                }
+    
+                SocketAddress destination = req.getDestination();
+                if (destination == null) {
+                    destination = session.getRemoteAddress();
+                }
+    
+                int localWrittenBytes = ch.send(buf.buf(), destination);
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+    
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
+    
+                    writtenBytes += localWrittenBytes;
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Fri Aug 17 21:39:46 2007
@@ -325,60 +325,67 @@
     }
 
     private void flush(DatagramSessionImpl session) throws IOException {
-        DatagramChannel ch = session.getChannel();
+        // Clear OP_WRITE
+        SelectionKey key = session.getSelectionKey();
+        if (key == null) {
+            scheduleFlush(session);
+            return;
+        }
+        if (!key.isValid()) {
+            return;
+        }
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
+        DatagramChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for (;;) {
-            synchronized (writeRequestQueue) {
-                req = writeRequestQueue.peek();
-            }
-
-            if (req == null) {
-                break;
-            }
-
-            ByteBuffer buf = (ByteBuffer) req.getMessage();
-            if (buf.remaining() == 0) {
-                // pop and fire event
+        int writtenBytes = 0;
+        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+        try {
+            for (;;) {
+                WriteRequest req;
                 synchronized (writeRequestQueue) {
-                    writeRequestQueue.poll();
+                    req = writeRequestQueue.peek();
                 }
-
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if (key == null) {
-                scheduleFlush(session);
-                break;
-            }
-            if (!key.isValid()) {
-                continue;
-            }
-
-            int writtenBytes = ch.write(buf.buf());
-
-            if (writtenBytes == 0) {
-                // Kernel buffer is full
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } else if (writtenBytes > 0) {
-                key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                // pop and fire event
-                synchronized (writeRequestQueue) {
-                    writeRequestQueue.poll();
+    
+                if (req == null) {
+                    break;
+                }
+    
+                ByteBuffer buf = (ByteBuffer) req.getMessage();
+                if (buf.remaining() == 0) {
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
+    
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
+                    continue;
+                }
+    
+                int localWrittenBytes = ch.write(buf.buf());
+                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+                    // Kernel buffer is full or wrote too much
+                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                    break;
+                } else {
+                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+    
+                    // pop and fire event
+                    synchronized (writeRequestQueue) {
+                        writeRequestQueue.poll();
+                    }
+    
+                    writtenBytes += localWrittenBytes;
+                    session.increaseWrittenMessages();
+                    buf.reset();
+                    session.getFilterChain().fireMessageSent(session, req);
                 }
-
-                session.increaseWrittenBytes(writtenBytes);
-                session.increaseWrittenMessages();
-                buf.reset();
-                session.getFilterChain().fireMessageSent(session, req);
             }
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }
 

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -388,74 +388,76 @@
         SocketChannel ch = session.getChannel();
         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
 
-        for (;;) {
-            WriteRequest req;
-
-            // Check for pending writes.
-            synchronized (writeRequestQueue) {
-                req = writeRequestQueue.peek();
-            }
-
-            if (req == null) {
-                break;
-            }
-
-            Object message = req.getMessage();
-            if (message instanceof FileRegion) {
-                FileRegion region = (FileRegion) message; 
-
-                if (region.getCount() <= 0) {
-                    // File has been sent, remove from queue
-                    synchronized (writeRequestQueue) {
-                        writeRequestQueue.poll();
-                    }
-                    session.increaseWrittenMessages();
-                    session.getFilterChain().fireMessageSent(session, req);
-                    continue;
+        int writtenBytes = 0;
+        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+        try {
+            do {
+                WriteRequest req;
+    
+                // Check for pending writes.
+                synchronized (writeRequestQueue) {
+                    req = writeRequestQueue.peek();
                 }
-                
-                if (key.isWritable()) {
-                    long writtenBytes = region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
-                    region.setPosition(region.getPosition() + writtenBytes);
-                    
-                    if (writtenBytes > 0) {
-                        session.increaseWrittenBytes(writtenBytes);
-                    }
-                }
-                
-                if (region.getCount() > 0) {
-                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+    
+                if (req == null) {
                     break;
                 }
-
-            } else {
-                ByteBuffer buf = (ByteBuffer) message;
-                if (buf.remaining() == 0) {
-                    // Buffer has been completely sent, remove request form queue
-                    synchronized (writeRequestQueue) {
-                        writeRequestQueue.poll();
-                    }
-
-                    session.increaseWrittenMessages();
-
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(session, req);
-                    continue;
-                }
-
-                if (key.isWritable()) {
-                    int writtenBytes = ch.write(buf.buf());
-                    if (writtenBytes > 0) {
-                        session.increaseWrittenBytes(writtenBytes);
+    
+                Object message = req.getMessage();
+                if (message instanceof FileRegion) {
+                    FileRegion region = (FileRegion) message; 
+    
+                    if (region.getCount() <= 0) {
+                        // File has been sent, remove from queue
+                        synchronized (writeRequestQueue) {
+                            writeRequestQueue.poll();
+                        }
+                        session.increaseWrittenMessages();
+                        session.getFilterChain().fireMessageSent(session, req);
+                        continue;
+                    }
+                    
+                    if (key.isWritable()) {
+                        long localWrittenBytes = 
+                            region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+                        region.setPosition(region.getPosition() + localWrittenBytes);
+                        writtenBytes += localWrittenBytes;
+                    }
+                    
+                    if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+                        // Kernel buffer is full or wrote too much.
+                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        break;
+                    }
+    
+                } else {
+                    ByteBuffer buf = (ByteBuffer) message;
+                    if (buf.remaining() == 0) {
+                        // Buffer has been completely sent, remove request form queue
+                        synchronized (writeRequestQueue) {
+                            writeRequestQueue.poll();
+                        }
+    
+                        session.increaseWrittenMessages();
+    
+                        buf.reset();
+                        session.getFilterChain().fireMessageSent(session, req);
+                        continue;
+                    }
+    
+                    if (key.isWritable()) {
+                        writtenBytes += ch.write(buf.buf());
+                    }
+    
+                    if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+                        // Kernel buffer is full or wrote too much.
+                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                        break;
                     }
                 }
-
-                if (buf.hasRemaining()) {
-                    // Kernel buffer is full
-                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    break;
-                }
-            }
+            } while (writtenBytes < maxWrittenBytes);
+        } finally {
+            session.increaseWrittenBytes(writtenBytes);
         }
     }