You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/08/18 06:39:48 UTC
svn commit: r567228 - in /mina:
branches/1.0/core/src/main/java/org/apache/mina/common/support/
branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/
branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/
branches...
Author: trustin
Date: Fri Aug 17 21:39:46 2007
New Revision: 567228
URL: http://svn.apache.org/viewvc?view=rev&rev=567228
Log:
Fixed issue: DIRMINA-305 (SocketIoProcessor is biased to write operations)
* Restricted the flush loop to write not more than twice of the socket send buffer size
Modified:
mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Aug 17 21:39:46 2007
@@ -304,17 +304,21 @@
}
public void increaseReadBytes(int increment) {
- readBytes += increment;
- lastReadTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForRead = 0;
+ if (increment > 0) {
+ readBytes += increment;
+ lastReadTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForRead = 0;
+ }
}
public void increaseWrittenBytes(int increment) {
- writtenBytes += increment;
- lastWriteTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForWrite = 0;
+ if (increment > 0) {
+ writtenBytes += increment;
+ lastWriteTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForWrite = 0;
+ }
}
public void increaseReadMessages() {
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -405,41 +405,44 @@
SocketChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
- for (;;) {
- WriteRequest req;
-
- synchronized (writeRequestQueue) {
- req = (WriteRequest) writeRequestQueue.first();
- }
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req;
+
synchronized (writeRequestQueue) {
- writeRequestQueue.pop();
+ req = (WriteRequest) writeRequestQueue.first();
}
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- if (key.isWritable()) {
- int writtenBytes = ch.write(buf.buf());
- if (writtenBytes > 0) {
- session.increaseWrittenBytes(writtenBytes);
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if (key.isWritable()) {
+ writtenBytes += ch.write(buf.buf());
+ }
+
+ if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
}
}
-
- if (buf.hasRemaining()) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
- }
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Aug 17 21:39:46 2007
@@ -425,65 +425,71 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for (;;) {
- synchronized (writeRequestQueue) {
- req = (WriteRequest) writeRequestQueue.first();
- }
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req;
synchronized (writeRequestQueue) {
- writeRequestQueue.pop();
+ req = (WriteRequest) writeRequestQueue.first();
}
-
- session.increaseWrittenMessages();
- buf.reset();
- ((DatagramFilterChain) session.getFilterChain())
- .fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int writtenBytes = ch.send(buf.buf(), destination);
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.pop();
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ ((DatagramFilterChain) session.getFilterChain())
+ .fireMessageSent(session, req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = ch.send(buf.buf(), destination);
+ writtenBytes += localWrittenBytes;
+
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
}
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Aug 17 21:39:46 2007
@@ -428,59 +428,67 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for (;;) {
- synchronized (writeRequestQueue) {
- req = (WriteRequest) writeRequestQueue.first();
- }
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req;
synchronized (writeRequestQueue) {
- writeRequestQueue.pop();
+ req = (WriteRequest) writeRequestQueue.first();
}
-
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- int writtenBytes = ch.write(buf.buf());
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.pop();
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ int localWrittenBytes = ch.write(buf.buf());
+ writtenBytes += localWrittenBytes;
+
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.pop();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
}
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Aug 17 21:39:46 2007
@@ -304,17 +304,21 @@
}
public void increaseReadBytes(int increment) {
- readBytes += increment;
- lastReadTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForRead = 0;
+ if (increment > 0) {
+ readBytes += increment;
+ lastReadTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForRead = 0;
+ }
}
public void increaseWrittenBytes(int increment) {
- writtenBytes += increment;
- lastWriteTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForWrite = 0;
+ if (increment > 0) {
+ writtenBytes += increment;
+ lastWriteTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForWrite = 0;
+ }
}
public void increaseReadMessages() {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -361,35 +361,38 @@
SocketChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- for (;;) {
- WriteRequest req = writeRequestQueue.peek();
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- writeRequestQueue.poll();
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- if (key.isWritable()) {
- int writtenBytes = ch.write(buf.buf());
- if (writtenBytes > 0) {
- session.increaseWrittenBytes(writtenBytes);
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req = writeRequestQueue.peek();
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if (key.isWritable()) {
+ writtenBytes += ch.write(buf.buf());
+ }
+
+ if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
}
}
-
- if (buf.hasRemaining()) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
- }
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Aug 17 21:39:46 2007
@@ -39,12 +39,12 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoAcceptor;
import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.transport.socket.nio.DatagramAcceptorConfig;
@@ -418,57 +418,63 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- for (;;) {
- WriteRequest req = writeRequestQueue.peek();
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
- writeRequestQueue.poll();
-
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int writtenBytes = ch.send(buf.buf(), destination);
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- writeRequestQueue.poll();
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req = writeRequestQueue.peek();
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = ch.send(buf.buf(), destination);
+ writtenBytes += localWrittenBytes;
+
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ // pop and fire event
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ }
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Aug 17 21:39:46 2007
@@ -409,52 +409,60 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- for (;;) {
- WriteRequest req = writeRequestQueue.peek();
-
- if (req == null)
- break;
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
- writeRequestQueue.poll();
-
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- int writtenBytes = ch.write(buf.buf());
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- writeRequestQueue.poll();
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
+ int writtenBytes = 0;
+ int maxWrittenBytes = ((DatagramSessionConfig) session.getConfig()).getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req = writeRequestQueue.peek();
+
+ if (req == null)
+ break;
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ int localWrittenBytes = ch.write(buf.buf());
+ writtenBytes += localWrittenBytes;
+
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ writeRequestQueue.poll();
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ }
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Aug 17 21:39:46 2007
@@ -364,17 +364,21 @@
}
public void increaseReadBytes(int increment) {
- readBytes += increment;
- lastReadTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForRead = 0;
+ if (increment > 0) {
+ readBytes += increment;
+ lastReadTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForRead = 0;
+ }
}
public void increaseWrittenBytes(long increment) {
- writtenBytes += increment;
- lastWriteTime = System.currentTimeMillis();
- idleCountForBoth = 0;
- idleCountForWrite = 0;
+ if (increment > 0) {
+ writtenBytes += increment;
+ lastWriteTime = System.currentTimeMillis();
+ idleCountForBoth = 0;
+ idleCountForWrite = 0;
+ }
}
public void increaseReadMessages() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Fri Aug 17 21:39:46 2007
@@ -371,66 +371,73 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for (;;) {
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
- if (req == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
+ int writtenBytes = 0;
+ int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req;
synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
+ req = writeRequestQueue.peek();
}
-
- session.increaseWrittenMessages();
- buf.reset();
- ((DatagramFilterChain) session.getFilterChain())
- .fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int writtenBytes = ch.send(buf.buf(), destination);
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
+
+ if (req == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ ((DatagramFilterChain) session.getFilterChain())
+ .fireMessageSent(session, req);
+ continue;
+ }
+
+ SocketAddress destination = req.getDestination();
+ if (destination == null) {
+ destination = session.getRemoteAddress();
+ }
+
+ int localWrittenBytes = ch.send(buf.buf(), destination);
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ writtenBytes += localWrittenBytes;
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
}
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Fri Aug 17 21:39:46 2007
@@ -325,60 +325,67 @@
}
private void flush(DatagramSessionImpl session) throws IOException {
- DatagramChannel ch = session.getChannel();
+ // Clear OP_WRITE
+ SelectionKey key = session.getSelectionKey();
+ if (key == null) {
+ scheduleFlush(session);
+ return;
+ }
+ if (!key.isValid()) {
+ return;
+ }
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+ DatagramChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
- for (;;) {
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
- if (req == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
+ int writtenBytes = 0;
+ int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+ try {
+ for (;;) {
+ WriteRequest req;
synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
+ req = writeRequestQueue.peek();
}
-
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- break;
- }
- if (!key.isValid()) {
- continue;
- }
-
- int writtenBytes = ch.write(buf.buf());
-
- if (writtenBytes == 0) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else if (writtenBytes > 0) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
- // pop and fire event
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
+
+ if (req == null) {
+ break;
+ }
+
+ ByteBuffer buf = (ByteBuffer) req.getMessage();
+ if (buf.remaining() == 0) {
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ int localWrittenBytes = ch.write(buf.buf());
+ if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ } else {
+ key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+
+ // pop and fire event
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ writtenBytes += localWrittenBytes;
+ session.increaseWrittenMessages();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
}
-
- session.increaseWrittenBytes(writtenBytes);
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
}
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?view=diff&rev=567228&r1=567227&r2=567228
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Aug 17 21:39:46 2007
@@ -388,74 +388,76 @@
SocketChannel ch = session.getChannel();
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- for (;;) {
- WriteRequest req;
-
- // Check for pending writes.
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
- if (req == null) {
- break;
- }
-
- Object message = req.getMessage();
- if (message instanceof FileRegion) {
- FileRegion region = (FileRegion) message;
-
- if (region.getCount() <= 0) {
- // File has been sent, remove from queue
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
- session.increaseWrittenMessages();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
+ int writtenBytes = 0;
+ int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
+ try {
+ do {
+ WriteRequest req;
+
+ // Check for pending writes.
+ synchronized (writeRequestQueue) {
+ req = writeRequestQueue.peek();
}
-
- if (key.isWritable()) {
- long writtenBytes = region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
- region.setPosition(region.getPosition() + writtenBytes);
-
- if (writtenBytes > 0) {
- session.increaseWrittenBytes(writtenBytes);
- }
- }
-
- if (region.getCount() > 0) {
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+
+ if (req == null) {
break;
}
-
- } else {
- ByteBuffer buf = (ByteBuffer) message;
- if (buf.remaining() == 0) {
- // Buffer has been completely sent, remove request form queue
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- if (key.isWritable()) {
- int writtenBytes = ch.write(buf.buf());
- if (writtenBytes > 0) {
- session.increaseWrittenBytes(writtenBytes);
+
+ Object message = req.getMessage();
+ if (message instanceof FileRegion) {
+ FileRegion region = (FileRegion) message;
+
+ if (region.getCount() <= 0) {
+ // File has been sent, remove from queue
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+ session.increaseWrittenMessages();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if (key.isWritable()) {
+ long localWrittenBytes =
+ region.getFileChannel().transferTo(region.getPosition(), region.getCount(), ch);
+ region.setPosition(region.getPosition() + localWrittenBytes);
+ writtenBytes += localWrittenBytes;
+ }
+
+ if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
+ }
+
+ } else {
+ ByteBuffer buf = (ByteBuffer) message;
+ if (buf.remaining() == 0) {
+ // Buffer has been completely sent, remove request form queue
+ synchronized (writeRequestQueue) {
+ writeRequestQueue.poll();
+ }
+
+ session.increaseWrittenMessages();
+
+ buf.reset();
+ session.getFilterChain().fireMessageSent(session, req);
+ continue;
+ }
+
+ if (key.isWritable()) {
+ writtenBytes += ch.write(buf.buf());
+ }
+
+ if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ break;
}
}
-
- if (buf.hasRemaining()) {
- // Kernel buffer is full
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
- }
- }
+ } while (writtenBytes < maxWrittenBytes);
+ } finally {
+ session.increaseWrittenBytes(writtenBytes);
}
}