You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/14 12:04:56 UTC
svn commit: r575603 - in /mina:
branches/1.0/core/src/main/java/org/apache/mina/common/support/
branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/
branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/
branches...
Author: trustin
Date: Fri Sep 14 03:04:45 2007
New Revision: 575603
URL: http://svn.apache.org/viewvc?rev=575603&view=rev
Log:
Resolved issue:
* DIRMINA-429 - AbstractIoFilterChain.doWrite slows as outstanding writes increase
* DIRMINA-430 - IoSession.getScheduledWriteBytes() slows with size of queue
* DIRMINA-431 - Deadlock in VmPipe mode
How I resolved:
* Applied the same fix to Datagram transport
* Backported all fixes to 1.0
* Moved some common methods to BaseIoSession (AbstractIoSession)
Added:
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (with props)
Modified:
mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Sep 14 03:04:45 2007
@@ -27,12 +27,17 @@
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TrafficMask;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.IoFilter.WriteRequest;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
/**
* Base implementation of {@link IoSession}.
*
@@ -40,6 +45,16 @@
* @version $Rev$, $Date$
*/
public abstract class BaseIoSession implements IoSession {
+
+ private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+ new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ BaseIoSession s = (BaseIoSession) future.getSession();
+ s.scheduledWriteBytes.set(0);
+ s.scheduledWriteRequests.set(0);
+ }
+ };
+
private final Map attributes = new HashMap(8);
private final long creationTime;
@@ -49,6 +64,12 @@
*/
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
+ private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+
+ private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
+ private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
+
private boolean closing;
// Configuration variables
@@ -90,6 +111,7 @@
protected BaseIoSession() {
creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
.currentTimeMillis();
+ closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
}
public boolean isConnected() {
@@ -103,6 +125,19 @@
public CloseFuture getCloseFuture() {
return closeFuture;
}
+
+ public boolean isScheduledForFlush() {
+ return scheduledForFlush.get();
+ }
+
+ public boolean setScheduledForFlush(boolean flag) {
+ if (flag) {
+ return scheduledForFlush.compareAndSet(false, true);
+ } else {
+ scheduledForFlush.set(false);
+ return true;
+ }
+ }
public CloseFuture close() {
synchronized (this) {
@@ -302,6 +337,14 @@
public long getWrittenMessages() {
return writtenMessages;
}
+
+ public int getScheduledWriteBytes() {
+ return scheduledWriteBytes.get();
+ }
+
+ public int getScheduledWriteRequests() {
+ return scheduledWriteRequests.get();
+ }
public void increaseReadBytes(int increment) {
if (increment > 0) {
@@ -318,15 +361,26 @@
lastWriteTime = System.currentTimeMillis();
idleCountForBoth = 0;
idleCountForWrite = 0;
+
+ scheduledWriteBytes.addAndGet(-increment);
}
}
-
+
public void increaseReadMessages() {
readMessages++;
}
public void increaseWrittenMessages() {
writtenMessages++;
+ scheduledWriteRequests.decrementAndGet();
+ }
+
+ public void increaseScheduledWriteBytes(int increment) {
+ scheduledWriteBytes.addAndGet(increment);
+ }
+
+ public void increaseScheduledWriteRequests() {
+ scheduledWriteRequests.incrementAndGet();
}
public long getCreationTime() {
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Sep 14 03:04:45 2007
@@ -45,14 +45,22 @@
// SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+ buffer.mark();
+
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ s.increaseScheduledWriteRequests();
+ } else {
+ s.increaseScheduledWriteBytes(buffer.remaining());
+ }
+
synchronized (writeRequestQueue) {
writeRequestQueue.push(writeRequest);
- if (writeRequestQueue.size() == 1
- && session.getTrafficMask().isWritable()) {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
- s.getIoProcessor().flush(s);
- }
+ }
+
+ if (session.getTrafficMask().isWritable()) {
+ s.getIoProcessor().flush(s);
}
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -101,10 +101,11 @@
}
void flush(SocketSessionImpl session) {
- scheduleFlush(session);
- Selector selector = getSelector();
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = getSelector();
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
@@ -122,10 +123,16 @@
}
}
- private void scheduleFlush(SocketSessionImpl session) {
- synchronized (flushingSessions) {
- flushingSessions.push(session);
+ private boolean scheduleFlush(SocketSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ synchronized (flushingSessions) {
+ flushingSessions.push(session);
+ }
+
+ return true;
}
+
+ return false;
}
private void scheduleTrafficControl(SocketSessionImpl session) {
@@ -342,6 +349,8 @@
if (session == null)
break;
+
+ session.setScheduledForFlush(false);
if (!session.isConnected()) {
releaseWriteBuffers(session);
@@ -362,7 +371,10 @@
}
try {
- doFlush(session);
+ boolean flushedAll = doFlush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
scheduleRemove(session);
session.getFilterChain().fireExceptionCaught(session, e);
@@ -403,7 +415,7 @@
}
}
- private void doFlush(SocketSessionImpl session) throws IOException {
+ private boolean doFlush(SocketSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -444,12 +456,14 @@
if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much.
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
}
}
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void doUpdateTrafficMask() {
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -158,18 +158,6 @@
return writeRequestQueue;
}
- public int getScheduledWriteRequests() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.messageSize();
- }
- }
-
- public int getScheduledWriteBytes() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.byteSize();
- }
- }
-
protected void write0(WriteRequest writeRequest) {
filterChain.fireFilterWrite(this, writeRequest);
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Sep 14 03:04:45 2007
@@ -287,19 +287,25 @@
}
public void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = getSelector();
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = getSelector();
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
public void closeSession(DatagramSessionImpl session) {
}
- private void scheduleFlush(DatagramSessionImpl session) {
- synchronized (flushingSessions) {
- flushingSessions.push(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ synchronized (flushingSessions) {
+ flushingSessions.push(session);
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -416,23 +422,28 @@
if (session == null)
break;
+ session.setScheduledForFlush(false);
+
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -476,7 +487,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
// pop and fire event
synchronized (writeRequestQueue) {
@@ -491,6 +502,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Sep 14 03:04:45 2007
@@ -229,16 +229,22 @@
}
public void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = getSelector();
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = getSelector();
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
- private void scheduleFlush(DatagramSessionImpl session) {
- synchronized (flushingSessions) {
- flushingSessions.push(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ synchronized (flushingSessions) {
+ flushingSessions.push(session);
+ }
+ return true;
+ } else {
+ return false;
}
}
@@ -418,24 +424,29 @@
if (session == null)
break;
+
+ session.setScheduledForFlush(false);
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -473,7 +484,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -490,6 +501,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -43,14 +43,22 @@
// SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+ buffer.mark();
+
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ s.increaseScheduledWriteRequests();
+ } else {
+ s.increaseScheduledWriteBytes(buffer.remaining());
+ }
+
synchronized (writeRequestQueue) {
writeRequestQueue.push(writeRequest);
- if (writeRequestQueue.size() == 1
- && session.getTrafficMask().isWritable()) {
- // Notify DatagramService only when writeRequestQueue was empty.
- s.getManagerDelegate().flushSession(s);
- }
+ }
+
+ if (session.getTrafficMask().isWritable()) {
+ s.getManagerDelegate().flushSession(s);
}
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -167,18 +167,6 @@
filterChain.fireFilterWrite(this, writeRequest);
}
- public int getScheduledWriteRequests() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.messageSize();
- }
- }
-
- public int getScheduledWriteBytes() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.byteSize();
- }
- }
-
public TransportType getTransportType() {
return TransportType.DATAGRAM;
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java Fri Sep 14 03:04:45 2007
@@ -89,7 +89,7 @@
DefaultConnectFuture future = new DefaultConnectFuture();
VmPipeSessionImpl localSession = new VmPipeSessionImpl(this, config,
- getListeners(), new Object(), // lock
+ getListeners(), // lock
new AnonymousSocketAddress(), handler, entry);
// initialize connector session
@@ -127,6 +127,8 @@
remoteSession.close();
}
+ // Start chains, and then allow and messages read/written to be processed. This is to ensure that
+ // sessionOpened gets received before a messageReceived
((VmPipeFilterChain) localSession.getFilterChain()).start();
((VmPipeFilterChain) remoteSession.getFilterChain()).start();
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -27,6 +27,7 @@
import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @author The Apache Directory Project (mina-dev@directory.apache.org)
@@ -36,20 +37,22 @@
private final Queue eventQueue = new ConcurrentLinkedQueue();
- private boolean flushEnabled;
+ private final AtomicBoolean flushEnabled = new AtomicBoolean();
+ private final AtomicBoolean sessionOpened = new AtomicBoolean();
public VmPipeFilterChain(IoSession session) {
super(session);
}
public void start() {
- flushEnabled = true;
+ flushEnabled.set(true);
flushEvents();
+ flushPendingDataQueues((VmPipeSessionImpl) getSession());
}
private void pushEvent(Event e) {
eventQueue.offer(e);
- if (flushEnabled) {
+ if (flushEnabled.get()) {
flushEvents();
}
}
@@ -68,12 +71,8 @@
if (type == EventType.RECEIVED) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (!s.getTrafficMask().isReadable()) {
- synchronized (s.pendingDataQueue) {
- s.pendingDataQueue.push(data);
- }
- } else {
+ if (sessionOpened.get() && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
+ try {
int byteCount = 1;
if (data instanceof ByteBuffer) {
byteCount = ((ByteBuffer) data).remaining();
@@ -82,7 +81,13 @@
s.increaseReadBytes(byteCount);
super.fireMessageReceived(s, data);
+ } finally {
+ s.getLock().unlock();
}
+
+ flushPendingDataQueues(s);
+ } else {
+ s.pendingDataQueue.add(data);
}
} else if (type == EventType.WRITE) {
super.fireFilterWrite(session, (WriteRequest) data);
@@ -94,6 +99,7 @@
super.fireSessionIdle(session, (IdleStatus) data);
} else if (type == EventType.OPENED) {
super.fireSessionOpened(session);
+ sessionOpened.set(true);
} else if (type == EventType.CREATED) {
super.fireSessionCreated(session);
} else if (type == EventType.CLOSED) {
@@ -103,6 +109,11 @@
}
}
+ private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
+ s.updateTrafficMask();
+ s.getRemoteSession().updateTrafficMask();
+ }
+
public void fireFilterClose(IoSession session) {
pushEvent(new Event(EventType.CLOSE, null));
}
@@ -141,14 +152,9 @@
protected void doWrite(IoSession session, WriteRequest writeRequest) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
- if (s.isConnected()) {
-
- if (!s.getTrafficMask().isWritable()) {
- synchronized (s.pendingDataQueue) {
- s.pendingDataQueue.push(writeRequest);
- }
- } else {
+ if (s.isConnected()) {
+ if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
+ try {
Object message = writeRequest.getMessage();
int byteCount = 1;
@@ -164,26 +170,40 @@
messageCopy = wb;
}
+ // Avoid unwanted side effect that scheduledWrite* becomes negative
+ // by increasing them.
+ s.increaseScheduledWriteBytes(byteCount);
+ s.increaseScheduledWriteRequests();
+
s.increaseWrittenBytes(byteCount);
s.increaseWrittenMessages();
s.getRemoteSession().getFilterChain().fireMessageReceived(
s.getRemoteSession(), messageCopy);
s.getFilterChain().fireMessageSent(s, writeRequest);
+ } finally {
+ s.getLock().unlock();
}
+
+ flushPendingDataQueues( s );
} else {
- writeRequest.getFuture().setWritten(false);
+ s.pendingDataQueue.add(writeRequest);
}
+ } else {
+ writeRequest.getFuture().setWritten(false);
}
}
protected void doClose(IoSession session) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- synchronized (s.lock) {
+ s.getLock().lock();
+ try {
if (!session.getCloseFuture().isClosed()) {
s.getServiceListeners().fireSessionDestroyed(s);
s.getRemoteSession().close();
}
+ } finally {
+ s.getLock().unlock();
}
}
@@ -223,7 +243,7 @@
private final Object data;
- public Event(EventType type, Object data) {
+ private Event(EventType type, Object data) {
this.type = type;
this.data = data;
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -34,6 +34,9 @@
import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.util.Queue;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
+
/**
* A {@link IoSession} for in-VM transport (VM_PIPE).
*
@@ -62,7 +65,7 @@
private final VmPipeSessionImpl remoteSession;
- final Object lock;
+ private final Lock lock;
final Queue pendingDataQueue;
@@ -70,12 +73,12 @@
* Constructor for client-side session.
*/
public VmPipeSessionImpl(IoService service, IoServiceConfig serviceConfig,
- IoServiceListenerSupport serviceListeners, Object lock,
+ IoServiceListenerSupport serviceListeners,
SocketAddress localAddress, IoHandler handler, VmPipe remoteEntry) {
this.service = service;
this.serviceConfig = serviceConfig;
this.serviceListeners = serviceListeners;
- this.lock = lock;
+ this.lock = new ReentrantLock();
this.localAddress = localAddress;
this.remoteAddress = this.serviceAddress = remoteEntry.getAddress();
this.handler = handler;
@@ -137,14 +140,6 @@
this.filterChain.fireFilterWrite(this, writeRequest);
}
- public int getScheduledWriteRequests() {
- return 0;
- }
-
- public int getScheduledWriteBytes() {
- return 0;
- }
-
public TransportType getTransportType() {
return TransportType.VM_PIPE;
}
@@ -184,5 +179,9 @@
}
}
}
+ }
+
+ Lock getLock() {
+ return lock;
}
}
Modified: mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java (original)
+++ mina/branches/1.0/core/src/main/java/org/apache/mina/util/Queue.java Fri Sep 14 03:04:45 2007
@@ -25,9 +25,6 @@
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilter.WriteRequest;
-
/**
* A unbounded circular queue.
*
@@ -153,116 +150,6 @@
return size;
}
- /**
- * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
- * in this queue.
- */
- public int byteSize() {
- if (isEmpty()) {
- return 0;
- }
-
- int byteSize = 0;
-
- if (first < last) {
- for (int i = first; i < last; i++) {
- if (items[i] instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) items[i]).remaining();
- } else if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) message).remaining();
- }
- }
- }
- } else {
- for (int i = first; i < items.length; i++) {
- if (items[i] instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) items[i]).remaining();
- } else if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) message).remaining();
- }
- }
- }
- for (int i = last - 1; i >= 0; i--) {
- if (items[i] instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) items[i]).remaining();
- } else if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- byteSize += ((ByteBuffer) message).remaining();
- }
- }
- }
- }
-
- return byteSize;
- }
-
- public int messageSize() {
- if (isEmpty()) {
- return 0;
- }
-
- int messageSize = 0;
-
- if (first < last) {
- for (int i = first; i < last; i++) {
- if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- messageSize ++;
- }
- } else {
- messageSize ++;
- }
- } else if (items[i] instanceof ByteBuffer) {
- if (((ByteBuffer) items[i]).hasRemaining()) {
- messageSize ++;
- }
- }
- }
- } else {
- for (int i = first; i < items.length; i++) {
- if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- messageSize ++;
- }
- } else {
- messageSize ++;
- }
- } else if (items[i] instanceof ByteBuffer) {
- if (((ByteBuffer) items[i]).hasRemaining()) {
- messageSize ++;
- }
- }
- }
- for (int i = last - 1; i >= 0; i--) {
- if (items[i] instanceof WriteRequest) {
- Object message = ((WriteRequest) items[i]).getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- messageSize ++;
- }
- } else {
- messageSize ++;
- }
- } else if (items[i] instanceof ByteBuffer) {
- if (((ByteBuffer) items[i]).hasRemaining()) {
- messageSize ++;
- }
- }
- }
- }
-
- return messageSize;
- }
-
public String toString() {
return "first=" + first + ", last=" + last + ", size=" + size
+ ", mask = " + mask;
Modified: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java (original)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/AbstractTrafficControlTest.java Fri Sep 14 03:04:45 2007
@@ -75,7 +75,7 @@
future.join();
IoSession session = future.getSession();
- // We wait for the sessionCreated() event is fired becayse we cannot guarentee that
+ // We wait for the sessionCreated() event is fired because we cannot guarantee that
// it is invoked already.
while (session.getAttribute("lock") == null) {
Thread.yield();
Added: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
URL: http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java?rev=575603&view=auto
==============================================================================
--- mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java (added)
+++ mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java Fri Sep 14 03:04:45 2007
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mina.transport.vmpipe;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+import junit.framework.TestCase;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.ThreadModel;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author Apache Mina Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class VmPipeSessionCrossCommunicationTest extends TestCase {
+ public void testOneSessionTalkingBackAndForthDoesNotDeadlock() throws Exception {
+ final VmPipeAddress address = new VmPipeAddress( 1 );
+ final IoConnector connector = new VmPipeConnector();
+ final AtomicReference c1 = new AtomicReference();
+ final CountDownLatch latch = new CountDownLatch( 1 );
+ final CountDownLatch messageCount = new CountDownLatch( 2 );
+ IoAcceptor acceptor = new VmPipeAcceptor();
+
+ acceptor.bind( address, new IoHandlerAdapter() {
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "start".equals( message ) ) {
+ session.write( "open new" );
+ } else if ( "re-use c1".equals( message ) ) {
+ session.write( "tell me something on c1 now" );
+ } else if ( ( (String) message ).startsWith( "please don't deadlock" ) ) {
+ messageCount.countDown();
+ } else {
+ fail( "unexpected message received " + message );
+ }
+ }
+ } );
+
+ connector.getDefaultConfig().setThreadModel( ThreadModel.MANUAL );
+
+ ConnectFuture future = connector.connect( address, new IoHandlerAdapter() {
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "open new".equals( message ) ) {
+ System.out.println( "opening c2 from " + Thread.currentThread().getName() );
+
+ ConnectFuture c2Future = connector.connect( address, new IoHandlerAdapter() {
+ public void sessionOpened( IoSession session ) throws Exception {
+ session.write( "re-use c1" );
+ }
+
+ public void messageReceived( IoSession session, Object message ) throws Exception {
+ System.out.println( Thread.currentThread().getName() + ": " + message );
+
+ if ( "tell me something on c1 now".equals( message ) ) {
+ latch.countDown();
+ ((IoSession) c1.get()).write( "please don't deadlock via c1" );
+ } else {
+ fail( "unexpected message received " + message );
+ }
+ }
+ } );
+
+ c2Future.join();
+
+ latch.await();
+
+ c2Future.getSession().write( "please don't deadlock via c2" );
+ } else {
+ fail( "unexpeced message received " + message );
+ }
+ }
+ } );
+
+ future.join();
+
+ c1.set( future.getSession() );
+ ((IoSession) c1.get()).write( "start" );
+
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+ while ( !messageCount.await( 100, TimeUnit.MILLISECONDS ) ) {
+ long[] threads = threadMXBean.findMonitorDeadlockedThreads();
+
+ if ( null != threads ) {
+ StringBuffer sb = new StringBuffer( 256 );
+ ThreadInfo[] infos = threadMXBean.getThreadInfo( threads, Integer.MAX_VALUE );
+
+ for (int i = 0; i < infos.length; i ++) {
+ ThreadInfo info = infos[i];
+ sb.append( info.getThreadName() )
+ .append( " blocked on " )
+ .append( info.getLockName() )
+ .append( " owned by " )
+ .append( info.getLockOwnerName() )
+ .append( "\n" );
+ }
+
+ for (int i = 0; i < infos.length; i ++) {
+ ThreadInfo info = infos[i];
+ sb.append( "\nStack for " ).append( info.getThreadName() ).append( "\n" );
+ StackTraceElement[] stackTrace = info.getStackTrace();
+ for (int j = 0; j < stackTrace.length; j ++) {
+ sb.append( "\t" ).append( stackTrace[j] ).append( "\n" );
+ }
+ }
+
+ fail( "deadlocked! \n" + sb );
+ }
+ }
+
+ ( (IoAcceptorConfig) acceptor.getDefaultConfig() ).setDisconnectOnUnbind( false );
+ acceptor.unbindAll();
+ }
+}
Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeSessionCrossCommunicationTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/common/support/BaseIoSession.java Fri Sep 14 03:04:45 2007
@@ -25,9 +25,13 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoService;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TrafficMask;
@@ -41,6 +45,16 @@
* @version $Rev$, $Date$
*/
public abstract class BaseIoSession implements IoSession {
+
+ private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+ new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ BaseIoSession s = (BaseIoSession) future.getSession();
+ s.scheduledWriteBytes.set(0);
+ s.scheduledWriteRequests.set(0);
+ }
+ };
+
private final Object lock = new Object();
private final Map<String, Object> attributes = Collections
@@ -52,6 +66,12 @@
* A future that will be set 'closed' when the connection is closed.
*/
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
+
+ private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+
+ private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
+
+ private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
private volatile boolean closing;
@@ -94,6 +114,7 @@
protected BaseIoSession() {
creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
.currentTimeMillis();
+ closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
}
public boolean isConnected() {
@@ -107,6 +128,19 @@
public CloseFuture getCloseFuture() {
return closeFuture;
}
+
+ public boolean isScheduledForFlush() {
+ return scheduledForFlush.get();
+ }
+
+ public boolean setScheduledForFlush(boolean flag) {
+ if (flag) {
+ return scheduledForFlush.compareAndSet(false, true);
+ } else {
+ scheduledForFlush.set(false);
+ return true;
+ }
+ }
public CloseFuture close() {
synchronized (lock) {
@@ -300,6 +334,14 @@
public long getWrittenMessages() {
return writtenMessages;
}
+
+ public int getScheduledWriteBytes() {
+ return scheduledWriteBytes.get();
+ }
+
+ public int getScheduledWriteRequests() {
+ return scheduledWriteRequests.get();
+ }
public void increaseReadBytes(int increment) {
if (increment > 0) {
@@ -316,15 +358,26 @@
lastWriteTime = System.currentTimeMillis();
idleCountForBoth = 0;
idleCountForWrite = 0;
+
+ scheduledWriteBytes.addAndGet(-increment);
}
}
-
+
public void increaseReadMessages() {
readMessages++;
}
public void increaseWrittenMessages() {
writtenMessages++;
+ scheduledWriteRequests.decrementAndGet();
+ }
+
+ public void increaseScheduledWriteBytes(int increment) {
+ scheduledWriteBytes.addAndGet(increment);
+ }
+
+ public void increaseScheduledWriteRequests() {
+ scheduledWriteRequests.incrementAndGet();
}
public long getCreationTime() {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Sep 14 03:04:45 2007
@@ -49,7 +49,13 @@
ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
buffer.mark();
- s.getScheduledWriteBytesCounter().addAndGet(buffer.remaining());
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ s.increaseScheduledWriteRequests();
+ } else {
+ s.increaseScheduledWriteBytes(buffer.remaining());
+ }
+
writeRequestQueue.add(writeRequest);
if (session.getTrafficMask().isWritable()) {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -110,7 +110,7 @@
}
private boolean scheduleFlush(SocketSessionImpl session) {
- if ( session.getInFlushQueue().compareAndSet( false, true ) ) {
+ if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
return true;
@@ -306,7 +306,7 @@
if (session == null)
break;
- session.getInFlushQueue().set( false );
+ session.setScheduledForFlush(false);
if (!session.isConnected()) {
releaseWriteBuffers(session);
@@ -328,7 +328,7 @@
try {
boolean flushedAll = doFlush(session);
- if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.getInFlushQueue().get()) {
+ if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
scheduleFlush( session );
}
} catch (IOException e) {
@@ -342,11 +342,10 @@
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
- while ((req = writeRequestQueue.poll()) != null) {
+ if ((req = writeRequestQueue.poll()) != null) {
ByteBuffer buf = (ByteBuffer) req.getMessage();
try {
buf.release();
- session.getScheduledWriteBytesCounter().addAndGet( -buf.remaining() );
} catch (IllegalStateException e) {
session.getFilterChain().fireExceptionCaught(session, e);
} finally {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -25,10 +25,7 @@
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -37,7 +34,7 @@
import org.apache.mina.common.IoSessionConfig;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
-import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.common.support.BaseIoSessionConfig;
import org.apache.mina.common.support.IoServiceListenerSupport;
@@ -73,10 +70,6 @@
private final IoServiceListenerSupport serviceListeners;
- private final AtomicBoolean inFlushQueue = new AtomicBoolean( false );
-
- private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
-
private SelectionKey key;
private int readBufferSize = 1024;
@@ -167,43 +160,6 @@
return writeRequestQueue;
}
- public int getScheduledWriteRequests() {
- int size = 0;
- for (WriteRequest request : writeRequestQueue) {
- Object message = request.getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- size ++;
- }
- } else {
- size ++;
- }
- }
-
- return size;
- }
-
- /**
- * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
- * in the writeRequestQueue queue.
- *
- * @throws ClassCastException if an element is not a {@link ByteBuffer}
- */
- public int getScheduledWriteBytes() {
- return scheduledWriteBytes.get();
- }
-
- @Override
- public void increaseWrittenBytes( int increment ) {
- super.increaseWrittenBytes( increment );
-
- scheduledWriteBytes.addAndGet( -increment );
- }
-
- AtomicInteger getScheduledWriteBytesCounter() {
- return scheduledWriteBytes;
- }
-
@Override
protected void write0(WriteRequest writeRequest) {
filterChain.fireFilterWrite(this, writeRequest);
@@ -236,10 +192,6 @@
void setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
- }
-
- AtomicBoolean getInFlushQueue() {
- return inFlushQueue;
}
private class SessionConfigImpl extends BaseIoSessionConfig implements
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorDelegate.java Fri Sep 14 03:04:45 2007
@@ -286,18 +286,24 @@
}
public void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
public void closeSession(DatagramSessionImpl session) {
}
- private void scheduleFlush(DatagramSessionImpl session) {
- flushingSessions.add(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
}
private class Worker implements Runnable {
@@ -409,23 +415,28 @@
if (session == null)
break;
+ session.setScheduledForFlush(false);
+
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -463,7 +474,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
// pop and fire event
writeRequestQueue.poll();
@@ -476,6 +487,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Fri Sep 14 03:04:45 2007
@@ -225,15 +225,21 @@
}
public void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
- private void scheduleFlush(DatagramSessionImpl session) {
- flushingSessions.add(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
}
public void updateTrafficMask(DatagramSessionImpl session) {
@@ -400,23 +406,28 @@
if (session == null)
break;
+ session.setScheduledForFlush(false);
+
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -449,7 +460,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -464,6 +475,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -45,14 +45,20 @@
// SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
- ((ByteBuffer) writeRequest.getMessage()).mark();
- synchronized (writeRequestQueue) {
- writeRequestQueue.add(writeRequest);
- if (writeRequestQueue.size() == 1
- && session.getTrafficMask().isWritable()) {
- // Notify DatagramService only when writeRequestQueue was empty.
- s.getManagerDelegate().flushSession(s);
- }
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+ buffer.mark();
+
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ s.increaseScheduledWriteRequests();
+ } else {
+ s.increaseScheduledWriteBytes(buffer.remaining());
+ }
+
+ writeRequestQueue.add(writeRequest);
+
+ if (session.getTrafficMask().isWritable()) {
+ s.getManagerDelegate().flushSession(s);
}
}
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/socket/nio/support/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -27,8 +27,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.mina.common.BroadcastIoSession;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
@@ -38,6 +36,7 @@
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportType;
import org.apache.mina.common.WriteFuture;
+import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
import org.apache.mina.transport.socket.nio.DatagramServiceConfig;
import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
@@ -170,34 +169,6 @@
@Override
protected void write0(WriteRequest writeRequest) {
filterChain.fireFilterWrite(this, writeRequest);
- }
-
- public int getScheduledWriteRequests() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (WriteRequest request : writeRequestQueue) {
- Object message = request.getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- size ++;
- }
- } else {
- size ++;
- }
- }
- }
-
- return size;
- }
-
- public int getScheduledWriteBytes() {
- int byteSize = 0;
-
- for (WriteRequest request : writeRequestQueue) {
- byteSize += ((ByteBuffer) request.getMessage()).remaining();
- }
-
- return byteSize;
}
public TransportType getTransportType() {
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -180,6 +180,11 @@
messageCopy = wb;
}
+ // Avoid unwanted side effect that scheduledWrite* becomes negative
+ // by increasing them.
+ s.increaseScheduledWriteBytes(byteCount);
+ s.increaseScheduledWriteRequests();
+
s.increaseWrittenBytes(byteCount);
s.increaseWrittenMessages();
Modified: mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java (original)
+++ mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -144,14 +144,6 @@
this.filterChain.fireFilterWrite(this, writeRequest);
}
- public int getScheduledWriteRequests() {
- return 0;
- }
-
- public int getScheduledWriteBytes() {
- return 0;
- }
-
public TransportType getTransportType() {
return TransportType.VM_PIPE;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Sep 14 03:04:45 2007
@@ -29,6 +29,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -38,6 +41,16 @@
* @version $Rev$, $Date$
*/
public abstract class AbstractIoSession implements IoSession {
+
+ private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
+ new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ AbstractIoSession s = (AbstractIoSession) future.getSession();
+ s.scheduledWriteBytes.set(0);
+ s.scheduledWriteMessages.set(0);
+ }
+ };
+
private final Object lock = new Object();
private final Map<String, Object> attributes = Collections
@@ -50,6 +63,12 @@
*/
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
+ private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
+
+ private final AtomicLong scheduledWriteBytes = new AtomicLong();
+
+ private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
+
private volatile boolean closing;
private TrafficMask trafficMask = TrafficMask.ALL;
@@ -82,6 +101,7 @@
protected AbstractIoSession() {
creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
.currentTimeMillis();
+ closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
}
public boolean isConnected() {
@@ -96,6 +116,19 @@
return closeFuture;
}
+ public boolean isScheduledForFlush() {
+ return scheduledForFlush.get();
+ }
+
+ public boolean setScheduledForFlush(boolean flag) {
+ if (flag) {
+ return scheduledForFlush.compareAndSet(false, true);
+ } else {
+ scheduledForFlush.set(false);
+ return true;
+ }
+ }
+
public CloseFuture close() {
synchronized (lock) {
if (isClosing()) {
@@ -360,6 +393,14 @@
public long getWrittenMessages() {
return writtenMessages;
}
+
+ public long getScheduledWriteBytes() {
+ return scheduledWriteBytes.get();
+ }
+
+ public int getScheduledWriteMessages() {
+ return scheduledWriteMessages.get();
+ }
public void increaseReadBytes(int increment) {
if (increment > 0) {
@@ -376,6 +417,8 @@
lastWriteTime = System.currentTimeMillis();
idleCountForBoth = 0;
idleCountForWrite = 0;
+
+ scheduledWriteBytes.addAndGet(-increment);
}
}
@@ -385,6 +428,15 @@
public void increaseWrittenMessages() {
writtenMessages++;
+ scheduledWriteMessages.decrementAndGet();
+ }
+
+ public void increaseScheduledWriteBytes(int increment) {
+ scheduledWriteBytes.addAndGet(increment);
+ }
+
+ public void increaseScheduledWriteMessages() {
+ scheduledWriteMessages.incrementAndGet();
}
public long getCreationTime() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Fri Sep 14 03:04:45 2007
@@ -262,15 +262,21 @@
}
void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
- private void scheduleFlush(DatagramSessionImpl session) {
- flushingSessions.add(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
}
private class Worker implements Runnable {
@@ -361,24 +367,29 @@
if (session == null) {
break;
}
+
+ session.setScheduledForFlush(false);
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -421,7 +432,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -439,6 +450,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Fri Sep 14 03:04:45 2007
@@ -170,15 +170,21 @@
}
void flushSession(DatagramSessionImpl session) {
- scheduleFlush(session);
- Selector selector = this.selector;
- if (selector != null) {
- selector.wakeup();
+ if (scheduleFlush(session)) {
+ Selector selector = this.selector;
+ if (selector != null) {
+ selector.wakeup();
+ }
}
}
- private void scheduleFlush(DatagramSessionImpl session) {
- flushingSessions.add(session);
+ private boolean scheduleFlush(DatagramSessionImpl session) {
+ if (session.setScheduledForFlush(true)) {
+ flushingSessions.add(session);
+ return true;
+ } else {
+ return false;
+ }
}
void updateTrafficMask(DatagramSessionImpl session) {
@@ -315,24 +321,29 @@
if (session == null) {
break;
}
+
+ session.setScheduledForFlush(false);
try {
- flush(session);
+ boolean flushedAll = flush(session);
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ scheduleFlush(session);
+ }
} catch (IOException e) {
session.getFilterChain().fireExceptionCaught(session, e);
}
}
}
- private void flush(DatagramSessionImpl session) throws IOException {
+ private boolean flush(DatagramSessionImpl session) throws IOException {
// Clear OP_WRITE
SelectionKey key = session.getSelectionKey();
if (key == null) {
scheduleFlush(session);
- return;
+ return false;
}
if (!key.isValid()) {
- return;
+ return false;
}
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -369,7 +380,7 @@
if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
// Kernel buffer is full or wrote too much
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- break;
+ return false;
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
@@ -387,6 +398,8 @@
} finally {
session.increaseWrittenBytes(writtenBytes);
}
+
+ return true;
}
private void registerNew() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramFilterChain.java Fri Sep 14 03:04:45 2007
@@ -43,16 +43,22 @@
protected void doWrite(IoSession session, WriteRequest writeRequest) {
DatagramSessionImpl s = (DatagramSessionImpl) session;
Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
- ((ByteBuffer) writeRequest.getMessage()).mark();
- int writeRequestQueueSize;
+ ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
+ buffer.mark();
+
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ s.increaseScheduledWriteMessages();
+ } else {
+ s.increaseScheduledWriteBytes(buffer.remaining());
+ }
+
synchronized (writeRequestQueue) {
writeRequestQueue.add(writeRequest);
- writeRequestQueueSize = writeRequestQueue.size();
}
-
- if (writeRequestQueueSize == 1 && session.getTrafficMask().isWritable()) {
- // Notify SocketIoProcessor only when writeRequestQueue was empty.
+
+ if (session.getTrafficMask().isWritable()) {
IoService service = s.getService();
if (service instanceof DatagramAcceptor) {
((DatagramAcceptor) service).flushSession(s);
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -170,37 +170,6 @@
filterChain.fireFilterWrite(this, writeRequest);
}
- public int getScheduledWriteMessages() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (WriteRequest request : writeRequestQueue) {
- Object message = request.getMessage();
- if (message instanceof ByteBuffer) {
- if (((ByteBuffer) message).hasRemaining()) {
- size++;
- }
- } else {
- size++;
- }
- }
- }
-
- return size;
- }
-
- public long getScheduledWriteBytes() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (Object o : writeRequestQueue) {
- if (o instanceof ByteBuffer) {
- size += ((ByteBuffer) o).remaining();
- }
- }
- }
-
- return size;
- }
-
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Sep 14 03:04:45 2007
@@ -125,7 +125,7 @@
}
private boolean scheduleFlush(SocketSessionImpl session) {
- if (session.getInFlushQueue().compareAndSet(false, true)) {
+ if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
return true;
@@ -334,7 +334,7 @@
break;
}
- session.getInFlushQueue().set(false);
+ session.setScheduledForFlush(false);
if (!session.isConnected()) {
clearWriteRequestQueue(session);
@@ -356,7 +356,7 @@
try {
boolean flushedAll = doFlush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.getInFlushQueue().get()) {
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
scheduleFlush(session);
}
} catch (IOException e) {
@@ -370,12 +370,10 @@
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
- while ((req = writeRequestQueue.poll()) != null) {
+ if ((req = writeRequestQueue.poll()) != null) {
Object m = req.getMessage();
if (m instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) req.getMessage();
-
- session.getScheduledWriteBytesCounter().addAndGet(-buf.remaining());
// The first unwritten empty buffer must be
// forwarded to the filter chain.
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -25,8 +25,6 @@
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
@@ -69,10 +67,6 @@
private final IoHandler handler;
- private final AtomicBoolean inFlushQueue = new AtomicBoolean(false);
-
- private final AtomicLong scheduledWriteBytes = new AtomicLong();
-
private SelectionKey key;
private int readBufferSize = 1024;
@@ -149,21 +143,6 @@
return size;
}
- public long getScheduledWriteBytes() {
- return scheduledWriteBytes.get();
- }
-
- @Override
- public void increaseWrittenBytes(long increment) {
- super.increaseWrittenBytes(increment);
-
- scheduledWriteBytes.addAndGet(-increment);
- }
-
- AtomicLong getScheduledWriteBytesCounter() {
- return scheduledWriteBytes;
- }
-
@Override
protected void write0(WriteRequest writeRequest) {
filterChain.fireFilterWrite(this, writeRequest);
@@ -195,17 +174,18 @@
this.readBufferSize = readBufferSize;
}
- AtomicBoolean getInFlushQueue() {
- return inFlushQueue;
- }
-
void queueWriteRequest(WriteRequest writeRequest) {
if (writeRequest.getMessage() instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) writeRequest.getMessage();
// SocketIoProcessor.doFlush() will reset it after write is finished
// because the buffer will be passed with messageSent event.
buffer.mark();
- scheduledWriteBytes.addAndGet(buffer.remaining());
+ int remaining = buffer.remaining();
+ if (remaining == 0) {
+ increaseScheduledWriteMessages();
+ } else {
+ increaseScheduledWriteBytes(buffer.remaining());
+ }
}
writeRequestQueue.add(writeRequest);
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Fri Sep 14 03:04:45 2007
@@ -182,6 +182,11 @@
messageCopy = wb;
}
+ // Avoid unwanted side effect that scheduledWrite* becomes negative
+ // by increasing them.
+ s.increaseScheduledWriteBytes(byteCount);
+ s.increaseScheduledWriteMessages();
+
s.increaseWrittenBytes(byteCount);
s.increaseWrittenMessages();
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=575603&r1=575602&r2=575603&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Fri Sep 14 03:04:45 2007
@@ -144,14 +144,6 @@
this.filterChain.fireFilterWrite(this, writeRequest);
}
- public int getScheduledWriteMessages() {
- return 0;
- }
-
- public long getScheduledWriteBytes() {
- return 0;
- }
-
public VmPipeAddress getRemoteAddress() {
return remoteAddress;
}