You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/17 08:43:05 UTC
svn commit: r576271 - in /mina/trunk:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/transport/socket/nio/
core/src/main/java/org/apache/mina/transport/vmpipe/
core/src/main/java/org/apache/mina/util/ core/src/test/java/o...
Author: trustin
Date: Sun Sep 16 23:43:04 2007
New Revision: 576271
URL: http://svn.apache.org/viewvc?rev=576271&view=rev
Log:
* Removed IoSession parameter from IoFilterChain because it's redundant
* Removed WriteRequest parameter from IoProcessor.flush because it's unnecesary
* Multi-thread support for both DatagramAcceptor and DatagramConnector (need to test on various platforms)
* Removed IoSessionRecycler because all datagram sessions are now connected datagram socket.
Removed:
mina/trunk/core/src/main/java/org/apache/mina/common/ExpiringIoSessionRecycler.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionRecycler.java
mina/trunk/core/src/main/java/org/apache/mina/util/ExpirationListener.java
mina/trunk/core/src/main/java/org/apache/mina/util/ExpiringMap.java
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramRecyclerTest.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java
mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 23:43:04 2007
@@ -34,8 +34,6 @@
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$
- *
- * TODO Provide abstraction for bind, unbind and connect (+ cancellation)
*/
public abstract class AbstractIoProcessor implements IoProcessor {
@@ -96,7 +94,7 @@
startupWorker();
}
- public void flush(IoSession session, WriteRequest writeRequest) {
+ public void flush(IoSession session) {
boolean needsWakeup = flushingSessions.isEmpty();
if (scheduleFlush((AbstractIoSession) session) && needsWakeup) {
wakeup();
@@ -160,7 +158,7 @@
if (notified) {
// Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
scheduleRemove(session);
wakeup();
} else {
@@ -193,7 +191,7 @@
doRemove(session);
removedSessions ++;
} catch (Exception e) {
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
} finally {
clearWriteRequestQueue(session);
((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
@@ -256,10 +254,8 @@
buf.flip();
}
- session.increaseReadBytes(readBytes);
-
if (readBytes > 0) {
- session.getFilterChain().fireMessageReceived(session, buf);
+ session.getFilterChain().fireMessageReceived(buf);
buf = null;
if (session.getTransportMetadata().hasFragmentation()) {
@@ -282,9 +278,9 @@
}
} catch (IOException e) {
scheduleRemove(session);
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
} catch (Throwable e) {
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
}
}
@@ -298,7 +294,7 @@
try {
notifyIdleness(session, currentTime);
} catch (Exception e) {
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
}
}
}
@@ -327,7 +323,7 @@
if (idleTime > 0 && lastIoTime != 0
&& currentTime - lastIoTime >= idleTime) {
session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
+ session.getFilterChain().fireSessionIdle(status);
}
}
@@ -335,8 +331,7 @@
long currentTime, long writeTimeout, long lastIoTime) throws Exception {
if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout
&& (interestOps(session) & SelectionKey.OP_WRITE) != 0) {
- session.getFilterChain().fireExceptionCaught(session,
- new WriteTimeoutException());
+ session.getFilterChain().fireExceptionCaught(new WriteTimeoutException());
}
}
@@ -369,7 +364,7 @@
}
} catch (Exception e) {
scheduleRemove(session);
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
}
break;
case CLOSED:
@@ -400,7 +395,7 @@
if (buf.hasRemaining()) {
req.getFuture().setWritten(false);
} else {
- session.getFilterChain().fireMessageSent(session, req);
+ session.getFilterChain().fireMessageSent(req);
}
} else {
req.getFuture().setWritten(false);
@@ -424,66 +419,58 @@
int maxWrittenBytes = session.getConfig().getMaxReadBufferSize();
int writtenBytes = 0;
- try {
- do {
- // Check for pending writes.
- WriteRequest req = writeRequestQueue.peek();
-
- if (req == null) {
- break;
- }
-
- Object message = req.getMessage();
- if (message instanceof FileRegion) {
- FileRegion region = (FileRegion) message;
-
- if (region.getCount() <= 0) {
- // File has been sent, remove from queue
- writeRequestQueue.poll();
- session.increaseWrittenMessages();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
+ do {
+ // Check for pending writes.
+ WriteRequest req = writeRequestQueue.peek();
- if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
- long localWrittenBytes = transferFile(session, region);
- region.setPosition(region.getPosition() + localWrittenBytes);
- writtenBytes += localWrittenBytes;
- }
+ if (req == null) {
+ break;
+ }
- if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much.
- interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
- return false;
- }
+ Object message = req.getMessage();
+ if (message instanceof FileRegion) {
+ FileRegion region = (FileRegion) message;
+
+ if (region.getCount() <= 0) {
+ // File has been sent, remove from queue
+ writeRequestQueue.poll();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
+ }
- } else {
- ByteBuffer buf = (ByteBuffer) message;
- if (buf.remaining() == 0) {
- // Buffer has been completely sent, remove request form queue
- writeRequestQueue.poll();
-
- session.increaseWrittenMessages();
-
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
+ if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+ long localWrittenBytes = transferFile(session, region);
+ region.setPosition(region.getPosition() + localWrittenBytes);
+ writtenBytes += localWrittenBytes;
+ }
- if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
- writtenBytes += write(session, buf);
- }
+ if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+ return false;
+ }
- if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much.
- interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
- return false;
- }
+ } else {
+ ByteBuffer buf = (ByteBuffer) message;
+ if (buf.remaining() == 0) {
+ // Buffer has been completely sent, remove request form queue
+ writeRequestQueue.poll();
+ buf.reset();
+ session.getFilterChain().fireMessageSent(req);
+ continue;
}
- } while (writtenBytes < maxWrittenBytes);
- } finally {
- session.increaseWrittenBytes(writtenBytes);
- }
+
+ if ((readyOps(session) & SelectionKey.OP_WRITE) != 0) {
+ writtenBytes += write(session, buf);
+ }
+
+ if (buf.hasRemaining() || writtenBytes >= maxWrittenBytes) {
+ // Kernel buffer is full or wrote too much.
+ interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);
+ return false;
+ }
+ }
+ } while (writtenBytes < maxWrittenBytes);
return true;
}
@@ -511,7 +498,7 @@
try {
interestOps(session, ops & mask);
} catch (Exception e) {
- session.getFilterChain().fireExceptionCaught(session, e);
+ session.getFilterChain().fireExceptionCaught(e);
}
break;
case CLOSED:
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Sun Sep 16 23:43:04 2007
@@ -146,7 +146,7 @@
}
}
- getFilterChain().fireFilterClose(this);
+ getFilterChain().fireFilterClose();
return closeFuture;
}
@@ -188,7 +188,7 @@
WriteFuture future = new DefaultWriteFuture(this);
getFilterChain().fireFilterWrite(
- this, new DefaultWriteRequest(message, future, remoteAddress));
+ new DefaultWriteRequest(message, future, remoteAddress));
if (message instanceof File) {
final FileChannel finalChannel = channel;
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Sun Sep 16 23:43:04 2007
@@ -39,14 +39,14 @@
/**
* A session attribute that stores a {@link ConnectFuture} related with
* the {@link IoSession}. {@link DefaultIoFilterChain} clears this
- * attribute and notifies the future when {@link #fireSessionOpened(IoSession)}
- * or {@link #fireExceptionCaught(IoSession, Throwable)} is invoked
+ * attribute and notifies the future when {@link #fireSessionOpened()}
+ * or {@link #fireExceptionCaught(Throwable)} is invoked
*/
public static final String CONNECT_FUTURE = DefaultIoFilterChain.class
.getName()
+ ".connectFuture";
- private final IoSession session;
+ private final AbstractIoSession session;
private final Map<String, Entry> name2entry = new HashMap<String, Entry>();
@@ -271,7 +271,7 @@
}
}
- public void fireSessionCreated(IoSession session) {
+ public void fireSessionCreated() {
Entry head = this.head;
callNextSessionCreated(head, session);
}
@@ -280,11 +280,11 @@
try {
entry.getFilter().sessionCreated(entry.getNextFilter(), session);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireSessionOpened(IoSession session) {
+ public void fireSessionOpened() {
Entry head = this.head;
callNextSessionOpened(head, session);
}
@@ -293,16 +293,16 @@
try {
entry.getFilter().sessionOpened(entry.getNextFilter(), session);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireSessionClosed(IoSession session) {
+ public void fireSessionClosed() {
// Update future.
try {
session.getCloseFuture().setClosed();
} catch (Throwable t) {
- fireExceptionCaught(session, t);
+ fireExceptionCaught(t);
}
// And start the chain.
@@ -315,11 +315,11 @@
entry.getFilter().sessionClosed(entry.getNextFilter(), session);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireSessionIdle(IoSession session, IdleStatus status) {
+ public void fireSessionIdle(IdleStatus status) {
Entry head = this.head;
callNextSessionIdle(head, session, status);
}
@@ -330,11 +330,15 @@
entry.getFilter().sessionIdle(entry.getNextFilter(), session,
status);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireMessageReceived(IoSession session, Object message) {
+ public void fireMessageReceived(Object message) {
+ if (message instanceof ByteBuffer) {
+ session.increaseReadBytes(((ByteBuffer) message).remaining());
+ }
+
Entry head = this.head;
callNextMessageReceived(head, session, message);
}
@@ -345,15 +349,27 @@
entry.getFilter().messageReceived(entry.getNextFilter(), session,
message);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireMessageSent(IoSession session, WriteRequest request) {
+ public void fireMessageSent(WriteRequest request) {
+ Object message = request.getMessage();
+ if (message instanceof ByteBuffer) {
+ ByteBuffer b = (ByteBuffer) message;
+ if (b.hasRemaining()) {
+ session.increaseWrittenBytes(((ByteBuffer) message).remaining());
+ } else {
+ session.increaseWrittenMessages();
+ }
+ } else {
+ session.increaseWrittenMessages();
+ }
+
try {
request.getFuture().setWritten(true);
} catch (Throwable t) {
- fireExceptionCaught(session, t);
+ fireExceptionCaught(t);
}
Entry head = this.head;
@@ -366,11 +382,11 @@
entry.getFilter().messageSent(entry.getNextFilter(), session,
writeRequest);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireExceptionCaught(IoSession session, Throwable cause) {
+ public void fireExceptionCaught(Throwable cause) {
// Notify the related ConnectFuture
// if the session is created from SocketConnector.
ConnectFuture future = (ConnectFuture) session
@@ -396,7 +412,7 @@
}
}
- public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
+ public void fireFilterWrite(WriteRequest writeRequest) {
Entry tail = this.tail;
callPreviousFilterWrite(tail, session, writeRequest);
}
@@ -408,11 +424,11 @@
writeRequest);
} catch (Throwable e) {
writeRequest.getFuture().setWritten(false);
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
- public void fireFilterClose(IoSession session) {
+ public void fireFilterClose() {
Entry tail = this.tail;
callPreviousFilterClose(tail, session);
}
@@ -421,7 +437,7 @@
try {
entry.getFilter().filterClose(entry.getNextFilter(), session);
} catch (Throwable e) {
- fireExceptionCaught(session, e);
+ fireExceptionCaught(e);
}
}
@@ -579,7 +595,7 @@
s.getWriteRequestQueue().add(writeRequest);
if (s.getTrafficMask().isWritable()) {
- s.getProcessor().flush(s, writeRequest);
+ s.getProcessor().flush(s);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Sun Sep 16 23:43:04 2007
@@ -103,8 +103,9 @@
public void add(IoSession session) {
}
- public void flush(IoSession session, WriteRequest writeRequest) {
- getFilterChain().fireMessageSent(session, writeRequest);
+ public void flush(IoSession session) {
+ getFilterChain().fireMessageSent(
+ ((DummySession) session).getWriteRequestQueue().poll());
}
public void remove(IoSession session) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Sun Sep 16 23:43:04 2007
@@ -102,7 +102,7 @@
if (idleTime > 0 && lastIoTime != 0
&& currentTime - lastIoTime >= idleTime) {
session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
+ session.getFilterChain().fireSessionIdle(status);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoFilterChain.java Sun Sep 16 23:43:04 2007
@@ -180,63 +180,63 @@
* call this method at all. Please use this method only when you implement a new transport
* or fire a virtual event.
*/
- public void fireSessionCreated(IoSession session);
+ public void fireSessionCreated();
/**
* Fires a {@link IoHandler#sessionOpened(IoSession)} event. Most users don't need to call
* this method at all. Please use this method only when you implement a new transport or
* fire a virtual event.
*/
- public void fireSessionOpened(IoSession session);
+ public void fireSessionOpened();
/**
* Fires a {@link IoHandler#sessionClosed(IoSession)} event. Most users don't need to call
* this method at all. Please use this method only when you implement a new transport or
* fire a virtual event.
*/
- public void fireSessionClosed(IoSession session);
+ public void fireSessionClosed();
/**
* Fires a {@link IoHandler#sessionIdle(IoSession, IdleStatus)} event. Most users don't
* need to call this method at all. Please use this method only when you implement a new
* transport or fire a virtual event.
*/
- public void fireSessionIdle(IoSession session, IdleStatus status);
+ public void fireSessionIdle(IdleStatus status);
/**
- * Fires a {@link #fireMessageReceived(IoSession, Object)} event. Most users don't need to
+ * Fires a {@link #fireMessageReceived(Object)} event. Most users don't need to
* call this method at all. Please use this method only when you implement a new transport
* or fire a virtual event.
*/
- public void fireMessageReceived(IoSession session, Object message);
+ public void fireMessageReceived(Object message);
/**
* Fires a {@link IoHandler#sessionOpened(IoSession)} event. Most users don't need to call
* this method at all. Please use this method only when you implement a new transport or
* fire a virtual event.
*/
- public void fireMessageSent(IoSession session, WriteRequest request);
+ public void fireMessageSent(WriteRequest request);
/**
* Fires a {@link IoHandler#exceptionCaught(IoSession, Throwable)} event. Most users don't
* need to call this method at all. Please use this method only when you implement a new
* transport or fire a virtual event.
*/
- public void fireExceptionCaught(IoSession session, Throwable cause);
+ public void fireExceptionCaught(Throwable cause);
/**
* Fires a {@link IoSession#write(Object)} event. Most users don't need to call this
* method at all. Please use this method only when you implement a new transport or fire a
* virtual event.
*/
- public void fireFilterWrite(IoSession session, WriteRequest writeRequest);
+ public void fireFilterWrite(WriteRequest writeRequest);
/**
* Fires a {@link IoSession#close()} event. Most users don't need to call this method at
* all. Please use this method only when you implement a new transport or fire a virtual
* event.
*/
- public void fireFilterClose(IoSession session);
+ public void fireFilterClose();
/**
* Represents a name-filter pair that an {@link IoFilterChain} contains.
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java Sun Sep 16 23:43:04 2007
@@ -39,10 +39,8 @@
/**
* Flushes the internal write request queue of the specified
* {@code session}.
- *
- * @param writeRequest the write request added right now
*/
- void flush(IoSession session, WriteRequest writeRequest);
+ void flush(IoSession session);
/**
* Controls the traffic of the specified {@code session} as specified
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoServiceListenerSupport.java Sun Sep 16 23:43:04 2007
@@ -157,8 +157,8 @@
}
// Fire session events.
- session.getFilterChain().fireSessionCreated(session);
- session.getFilterChain().fireSessionOpened(session);
+ session.getFilterChain().fireSessionCreated();
+ session.getFilterChain().fireSessionOpened();
// Fire listener events.
synchronized (listeners) {
@@ -183,7 +183,7 @@
}
// Fire session events.
- session.getFilterChain().fireSessionClosed(session);
+ session.getFilterChain().fireSessionClosed();
// Fire listener events.
try {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Sun Sep 16 23:43:04 2007
@@ -28,21 +28,22 @@
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoAcceptor;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.ExpiringIoSessionRecycler;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoProcessor;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionRecycler;
import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
import org.apache.mina.util.NamePreservingRunnable;
import org.apache.mina.util.NewThreadExecutor;
@@ -54,27 +55,24 @@
*/
public class DatagramAcceptor extends AbstractIoAcceptor implements
IoAcceptor {
- private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringIoSessionRecycler();
private static volatile int nextId = 0;
- private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
-
private final Executor executor;
private final int id = nextId++;
private final Selector selector;
-
- private final IoProcessor processor = new DatagramAcceptorProcessor();
+
+ private final DatagramConnector connector;
private DatagramChannel channel;
private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
-
- private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
+
+ private final ConcurrentMap<SocketAddress, Object> cache = new ConcurrentHashMap<SocketAddress, Object>();
private Worker worker;
@@ -89,10 +87,14 @@
* Creates a new instance.
*/
public DatagramAcceptor(Executor executor) {
- super(new DefaultDatagramSessionConfig());
+ this(Runtime.getRuntime().availableProcessors() + 1, executor);
+ }
- // The default reuseAddress should be 'true' for an accepted socket.
- getSessionConfig().setReuseAddress(true);
+ /**
+ * Creates a new instance.
+ */
+ public DatagramAcceptor(int processorCount, Executor executor) {
+ super(new DefaultDatagramSessionConfig());
try {
this.selector = Selector.open();
@@ -101,6 +103,11 @@
}
this.executor = executor;
+ this.connector = new DatagramConnector(
+ this, "DatagramAcceptor-" + id, processorCount, executor);
+
+ // The default reuseAddress should be 'true' for an accepted socket.
+ getSessionConfig().setReuseAddress(true);
}
@Override
@@ -127,12 +134,28 @@
return (InetSocketAddress) super.getLocalAddress();
}
+ // This method is added to work around a problem with
+ // bean property access mechanism.
+
+ /**
+ * @see org.apache.mina.common.AbstractIoAcceptor#setLocalAddress(java.net.SocketAddress)
+ * @param localAddress the local address
+ */
+ public void setLocalAddress(InetSocketAddress localAddress) {
+ super.setLocalAddress(localAddress);
+ }
+
+ @Override
+ protected IoServiceListenerSupport getListeners() {
+ return super.getListeners();
+ }
+
@Override
protected void doBind() throws IOException {
RegistrationRequest request = new RegistrationRequest();
- startupWorker();
registerQueue.add(request);
+ startupWorker();
selector.wakeup();
synchronized (request) {
@@ -156,8 +179,8 @@
protected void doUnbind() {
CancellationRequest request = new CancellationRequest();
- startupWorker();
cancelQueue.add(request);
+ startupWorker();
selector.wakeup();
synchronized (request) {
@@ -184,99 +207,28 @@
throw new IllegalStateException(
"Can't create a session from a unbound service.");
}
-
- return newSessionWithoutLock(remoteAddress);
- }
- }
-
- private IoSession newSessionWithoutLock(SocketAddress remoteAddress) {
- Selector selector = this.selector;
- DatagramChannel ch = this.channel;
- SelectionKey key = ch.keyFor(selector);
-
- IoSession session;
- IoSessionRecycler sessionRecycler = getSessionRecycler();
- synchronized (sessionRecycler) {
- session = sessionRecycler.recycle(getLocalAddress(), remoteAddress);
- if (session != null) {
- return session;
- }
-
- // If a new session needs to be created.
- DatagramSessionImpl datagramSession = new DatagramSessionImpl(
- this, ch, getHandler(),
- (InetSocketAddress) remoteAddress);
- datagramSession.setSelectionKey(key);
-
- getSessionRecycler().put(datagramSession);
- session = datagramSession;
- }
-
- try {
- this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
- getListeners().fireSessionCreated(session);
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
- }
-
- return session;
- }
-
- /**
- * Returns the {@link IoSessionRecycler} for this service.
- */
- public IoSessionRecycler getSessionRecycler() {
- return sessionRecycler;
- }
-
- /**
- * Sets the {@link IoSessionRecycler} for this service.
- *
- * @param sessionRecycler <tt>null</tt> to use the default recycler
- */
- public void setSessionRecycler(IoSessionRecycler sessionRecycler) {
- synchronized (bindLock) {
- if (isBound()) {
- throw new IllegalStateException(
- "sessionRecycler can't be set while the acceptor is bound.");
- }
-
- if (sessionRecycler == null) {
- sessionRecycler = DEFAULT_RECYCLER;
- }
- this.sessionRecycler = sessionRecycler;
- }
- }
-
- @Override
- protected IoServiceListenerSupport getListeners() {
- return super.getListeners();
- }
-
- IoProcessor getProcessor() {
- return processor;
- }
-
- private class DatagramAcceptorProcessor implements IoProcessor {
-
- public void add(IoSession session) {
- }
-
- public void flush(IoSession session, WriteRequest writeRequest) {
- if (scheduleFlush((DatagramSessionImpl) session)) {
- Selector selector = DatagramAcceptor.this.selector;
- if (selector != null) {
- selector.wakeup();
- }
+
+ Object data;
+ synchronized (cache) {
+ data = cache.get(remoteAddress);
+ if (data == null) {
+ ConnectFuture future = connector.connect(remoteAddress, getLocalAddress());
+ cache.put(remoteAddress, future);
+ future.awaitUninterruptibly();
+ return future.getSession();
+ }
+ }
+
+ if (data instanceof ConnectFuture) {
+ ConnectFuture future = (ConnectFuture) data;
+ future.awaitUninterruptibly();
+ return future.getSession();
+ } else if (data instanceof IoSession) {
+ return ((IoSession) data);
+ } else {
+ throw new IllegalStateException();
}
}
-
- public void remove(IoSession session) {
- getListeners().fireSessionDestroyed(session);
- }
-
- public void updateTrafficMask(IoSession session) {
- }
}
private synchronized void startupWorker() {
@@ -286,15 +238,6 @@
}
}
- private boolean scheduleFlush(DatagramSessionImpl session) {
- if (session.setScheduledForFlush(true)) {
- flushingSessions.add(session);
- return true;
- } else {
- return false;
- }
- }
-
private class Worker implements Runnable {
public void run() {
Thread.currentThread().setName("DatagramAcceptor-" + id);
@@ -309,7 +252,6 @@
processReadySessions(selector.selectedKeys());
}
- flushSessions();
cancelKeys();
if (selector.keys().isEmpty()) {
@@ -346,12 +288,6 @@
if (key.isReadable()) {
readSession(ch);
}
-
- if (key.isWritable()) {
- for (IoSession session : getManagedSessions()) {
- scheduleFlush((DatagramSessionImpl) session);
- }
- }
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
}
@@ -359,104 +295,55 @@
}
private void readSession(DatagramChannel channel) throws Exception {
- ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
- .getReceiveBufferSize());
+ final ByteBuffer readBuf = ByteBuffer.allocate(getSessionConfig()
+ .getReadBufferSize());
- SocketAddress remoteAddress = channel.receive(readBuf.buf());
+ final SocketAddress remoteAddress = channel.receive(readBuf.buf());
if (remoteAddress != null) {
- DatagramSessionImpl session = (DatagramSessionImpl) newSessionWithoutLock(remoteAddress);
-
readBuf.flip();
-
- ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
- newBuf.put(readBuf);
- newBuf.flip();
-
- session.increaseReadBytes(newBuf.remaining());
- session.getFilterChain().fireMessageReceived(session, newBuf);
- }
- }
-
- private void flushSessions() {
- for (; ;) {
- DatagramSessionImpl session = flushingSessions.poll();
- if (session == null) {
- break;
- }
-
- session.setScheduledForFlush(false);
-
- try {
- boolean flushedAll = flush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
- scheduleFlush(session);
- }
- } catch (IOException e) {
- session.getFilterChain().fireExceptionCaught(session, e);
- }
- }
- }
-
- private boolean flush(DatagramSessionImpl session) throws IOException {
- // Clear OP_WRITE
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- scheduleFlush(session);
- return false;
- }
- if (!key.isValid()) {
- return false;
- }
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- DatagramChannel ch = session.getChannel();
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
- int writtenBytes = 0;
- int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
- try {
- for (; ;) {
- WriteRequest req = writeRequestQueue.peek();
- if (req == null) {
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- // pop and fire event
- writeRequestQueue.poll();
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
-
- SocketAddress destination = req.getDestination();
- if (destination == null) {
- destination = session.getRemoteAddress();
- }
-
- int localWrittenBytes = ch.send(buf.buf(), destination);
- if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
- // Kernel buffer is full or wrote too much
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- return false;
- } else {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-
- // pop and fire event
- writeRequestQueue.poll();
- writtenBytes += localWrittenBytes;
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- }
+ Object data;
+ ConnectFuture future = null;
+ synchronized (cache) {
+ data = cache.get(remoteAddress);
+ if (data == null) {
+ future = connector.connect(remoteAddress, getLocalAddress());
+ cache.put(remoteAddress, future);
+ }
+ }
+
+ if (data == null) {
+ future.addListener(new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ ConnectFuture f = (ConnectFuture) future;
+ if (f.getException() == null) {
+ IoSession s = f.getSession();
+ cache.put(remoteAddress, s);
+ s.getCloseFuture().addListener(new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ cache.remove(remoteAddress);
+ }
+ });
+ s.getFilterChain().fireMessageReceived(readBuf);
+ } else {
+ ExceptionMonitor.getInstance().exceptionCaught(f.getException());
+ }
+ }
+ });
+ } else if (data instanceof ConnectFuture) {
+ future = (ConnectFuture) data;
+ future.addListener(new IoFutureListener() {
+ public void operationComplete(IoFuture future) {
+ ConnectFuture f = (ConnectFuture) future;
+ if (f.getException() == null) {
+ IoSession s = f.getSession();
+ s.getFilterChain().fireMessageReceived(readBuf);
+ }
+ }
+ });
+ } else if (data instanceof IoSession) {
+ ((IoSession) data).getFilterChain().fireMessageReceived(readBuf);
}
- } finally {
- session.increaseWrittenBytes(writtenBytes);
}
-
- return true;
}
private void registerNew() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Sun Sep 16 23:43:04 2007
@@ -28,9 +28,13 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.DefaultIoFilterChain;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoProcessor;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.util.NewThreadExecutor;
@@ -45,8 +49,11 @@
private static volatile int nextId = 0;
private final int id = nextId++;
+ private final IoService parent;
+ private final int processorCount;
+ private final NIOProcessor[] ioProcessors;
- private final IoProcessor processor;
+ private int processorDistributor = 0;
/**
* Creates a new instance.
@@ -59,9 +66,51 @@
* Creates a new instance.
*/
public DatagramConnector(Executor executor) {
+ this(Runtime.getRuntime().availableProcessors() + 1, executor);
+ }
+
+ /**
+ * Creates a new instance.
+ */
+ public DatagramConnector(int processorCount, Executor executor) {
+ this(null, null, processorCount, executor);
+ }
+
+ DatagramConnector(
+ IoService parent, String threadNamePrefix, int processorCount, Executor executor) {
super(new DefaultDatagramSessionConfig());
+
+ // DotagramAcceptor can use DatagramConnector as a child.
+ if (parent == null) {
+ parent = this;
+ }
+ if (threadNamePrefix == null) {
+ threadNamePrefix = "DatagramConnector-" + id;
+ }
+ this.parent = parent;
+
+ if (processorCount < 1) {
+ throw new IllegalArgumentException(
+ "Must have at least one processor");
+ }
+
+ this.processorCount = processorCount;
+ ioProcessors = new NIOProcessor[processorCount];
+
+ // create an array of SocketIoProcessors that will be used for
+ // handling sessions.
+ for (int i = 0; i < processorCount; i++) {
+ ioProcessors[i] = new NIOProcessor(
+ threadNamePrefix + '.' + i, executor);
+ }
+ }
+
+ private NIOProcessor nextProcessor() {
+ if (this.processorDistributor == Integer.MAX_VALUE) {
+ this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
+ }
- processor = new NIOProcessor("DatagramConnector-" + id, executor);
+ return ioProcessors[processorDistributor++ % processorCount];
}
public TransportMetadata getTransportMetadata() {
@@ -69,8 +118,66 @@
}
@Override
+ protected IoServiceListenerSupport getListeners() {
+ if (parent == this) {
+ return super.getListeners();
+ } else {
+ return ((DatagramAcceptor) parent).getListeners();
+ }
+ }
+
+ @Override
public DatagramSessionConfig getSessionConfig() {
- return (DatagramSessionConfig) super.getSessionConfig();
+ if (parent == this) {
+ return (DatagramSessionConfig) super.getSessionConfig();
+ } else {
+ return (DatagramSessionConfig) parent.getSessionConfig();
+ }
+ }
+
+ @Override
+ public DefaultIoFilterChainBuilder getFilterChain() {
+ if (parent == this) {
+ return super.getFilterChain();
+ } else {
+ return parent.getFilterChain();
+ }
+ }
+
+ @Override
+ public IoFilterChainBuilder getFilterChainBuilder() {
+ if (parent == this) {
+ return super.getFilterChainBuilder();
+ } else {
+ return parent.getFilterChainBuilder();
+ }
+ }
+
+ @Override
+ public void setFilterChainBuilder(IoFilterChainBuilder builder) {
+ if (parent == this) {
+ super.setFilterChainBuilder(builder);
+ } else {
+ parent.setFilterChainBuilder(builder);
+ }
+ }
+
+ @Override
+ public IoHandler getHandler() {
+ if (parent == this) {
+ return super.getHandler();
+ } else {
+ return parent.getHandler();
+ }
+ }
+
+ @Override
+ public void setHandler(IoHandler handler) {
+ if (parent == this) {
+ super.setHandler(handler);
+ } else {
+ parent.setHandler(handler);
+ }
}
@Override
@@ -81,13 +188,17 @@
IoSession session = null;
try {
ch = DatagramChannel.open();
- session = new DatagramSessionImpl(this, ch, getHandler());
-
+ ch.socket().setReuseAddress(getSessionConfig().isReuseAddress());
+ ch.socket().setReuseAddress(true);
+ ch.socket().setBroadcast(getSessionConfig().isBroadcast());
+
if (localAddress != null) {
ch.socket().bind(localAddress);
}
ch.connect(remoteAddress);
+ NIOProcessor processor = nextProcessor();
+ session = new DatagramSessionImpl(parent, ch, processor);
ConnectFuture future = new DefaultConnectFuture();
// DefaultIoFilterChain will notify the connect future.
session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
@@ -107,9 +218,5 @@
}
}
}
- }
-
- IoProcessor getProcessor() {
- return processor;
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Sun Sep 16 23:43:04 2007
@@ -26,10 +26,8 @@
import java.nio.channels.SelectionKey;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
-import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoProcessor;
@@ -66,37 +64,20 @@
private final InetSocketAddress localAddress;
private final InetSocketAddress remoteAddress;
+
+ private final IoProcessor processor;
private SelectionKey key;
/**
- * Creates a new acceptor instance.
- */
- DatagramSessionImpl(
- DatagramAcceptor service,
- DatagramChannel ch, IoHandler defaultHandler,
- InetSocketAddress remoteAddress) {
- this.service = service;
- this.ch = ch;
- this.handler = defaultHandler;
- this.remoteAddress = remoteAddress;
-
- // We didn't set the localAddress by calling getLocalSocketAddress() to avoid
- // the case that getLocalSocketAddress() returns IPv6 address while
- // serviceAddress represents the same address in IPv4.
- this.localAddress = service.getLocalAddress();
-
- this.config.setAll(service.getSessionConfig());
- }
-
- /**
* Creates a new connector instance.
*/
- DatagramSessionImpl(DatagramConnector service,
- DatagramChannel ch, IoHandler defaultHandler) {
+ DatagramSessionImpl(IoService service,
+ DatagramChannel ch, IoProcessor processor) {
this.service = service;
this.ch = ch;
- this.handler = defaultHandler;
+ this.handler = service.getHandler();
+ this.processor = processor;
this.remoteAddress = (InetSocketAddress) ch.socket()
.getRemoteSocketAddress();
this.localAddress = (InetSocketAddress) ch.socket()
@@ -111,11 +92,7 @@
@Override
protected IoProcessor getProcessor() {
- if (service instanceof DatagramAcceptor) {
- return ((DatagramAcceptor) service).getProcessor();
- } else {
- return ((DatagramConnector) service).getProcessor();
- }
+ return processor;
}
public DatagramSessionConfig getConfig() {
@@ -150,16 +127,6 @@
}
@Override
- public CloseFuture close() {
- if (service instanceof IoAcceptor) {
- ((DatagramAcceptor) service).getSessionRecycler()
- .remove(this);
- }
- CloseFuture answer = super.close();
- return answer;
- }
-
- @Override
public WriteFuture write(Object message, SocketAddress destination) {
if (!this.config.isBroadcast()) {
throw new IllegalStateException("Non-broadcast session");
@@ -197,7 +164,6 @@
ch.socket().setReceiveBufferSize(receiveBufferSize);
// Re-retrieve the effective receive buffer size.
receiveBufferSize = ch.socket().getReceiveBufferSize();
- DatagramSessionImpl.this.config.setReadBufferSize(receiveBufferSize);
} catch (SocketException e) {
throw new RuntimeIOException(e);
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Sep 16 23:43:04 2007
@@ -271,7 +271,6 @@
}
if (request.exception != null) {
- // TODO better exception handling.
if (request.exception instanceof RuntimeException) {
throw (RuntimeException) request.exception;
} else if (request.exception instanceof IOException) {
@@ -472,8 +471,7 @@
// and notify.
getListeners().fireServiceActivated();
- } catch (Throwable e) // TODO better exception handling.
- {
+ } catch (Throwable e) {
req.exception = e;
} finally {
synchronized (req) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Sun Sep 16 23:43:04 2007
@@ -86,14 +86,7 @@
if (!s.getTrafficMask().isReadable()) {
s.receivedMessageQueue.add(data);
} else {
- int byteCount = 1;
- if (data instanceof ByteBuffer) {
- byteCount = ((ByteBuffer) data).remaining();
- }
-
- s.increaseReadBytes(byteCount);
-
- super.fireMessageReceived(s, data);
+ super.fireMessageReceived(data);
}
} finally {
s.getLock().unlock();
@@ -102,22 +95,22 @@
s.receivedMessageQueue.add(data);
}
} else if (type == IoEventType.WRITE) {
- super.fireFilterWrite(session, (WriteRequest) data);
+ super.fireFilterWrite((WriteRequest) data);
} else if (type == IoEventType.MESSAGE_SENT) {
- super.fireMessageSent(session, (WriteRequest) data);
+ super.fireMessageSent((WriteRequest) data);
} else if (type == IoEventType.EXCEPTION_CAUGHT) {
- super.fireExceptionCaught(session, (Throwable) data);
+ super.fireExceptionCaught((Throwable) data);
} else if (type == IoEventType.SESSION_IDLE) {
- super.fireSessionIdle(session, (IdleStatus) data);
+ super.fireSessionIdle((IdleStatus) data);
} else if (type == IoEventType.SESSION_OPENED) {
- super.fireSessionOpened(session);
+ super.fireSessionOpened();
sessionOpened = true;
} else if (type == IoEventType.SESSION_CREATED) {
- super.fireSessionCreated(session);
+ super.fireSessionCreated();
} else if (type == IoEventType.SESSION_CLOSED) {
- super.fireSessionClosed(session);
+ super.fireSessionClosed();
} else if (type == IoEventType.CLOSE) {
- super.fireFilterClose(session);
+ super.fireFilterClose();
}
}
@@ -127,52 +120,52 @@
}
@Override
- public void fireFilterClose(IoSession session) {
- pushEvent(new IoEvent(IoEventType.CLOSE, session, null));
+ public void fireFilterClose() {
+ pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
}
@Override
- public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
- pushEvent(new IoEvent(IoEventType.WRITE, session, writeRequest));
+ public void fireFilterWrite(WriteRequest writeRequest) {
+ pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
}
@Override
- public void fireExceptionCaught(IoSession session, Throwable cause) {
- pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, session, cause));
+ public void fireExceptionCaught(Throwable cause) {
+ pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
}
@Override
- public void fireMessageSent(IoSession session, WriteRequest request) {
- pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, session, request));
+ public void fireMessageSent(WriteRequest request) {
+ pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
}
@Override
- public void fireSessionClosed(IoSession session) {
- pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, session, null));
+ public void fireSessionClosed() {
+ pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
}
@Override
- public void fireSessionCreated(IoSession session) {
- pushEvent(new IoEvent(IoEventType.SESSION_CREATED, session, null));
+ public void fireSessionCreated() {
+ pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
}
@Override
- public void fireSessionIdle(IoSession session, IdleStatus status) {
- pushEvent(new IoEvent(IoEventType.SESSION_IDLE, session, status));
+ public void fireSessionIdle(IdleStatus status) {
+ pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
}
@Override
- public void fireSessionOpened(IoSession session) {
- pushEvent(new IoEvent(IoEventType.SESSION_OPENED, session, null));
+ public void fireSessionOpened() {
+ pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
}
@Override
- public void fireMessageReceived(IoSession session, Object message) {
- pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, session, message));
+ public void fireMessageReceived(Object message) {
+ pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
}
private class VmPipeIoProcessor implements IoProcessor {
- public void flush(IoSession session, WriteRequest writeRequest) {
+ public void flush(IoSession session) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
Queue<WriteRequest> queue = s.getWriteRequestQueue();
if (queue.isEmpty()) {
@@ -183,13 +176,11 @@
try {
WriteRequest req;
while ((req = queue.poll()) != null) {
- int byteCount = 0;
Object message = req.getMessage();
Object messageCopy = message;
if (message instanceof ByteBuffer) {
ByteBuffer rb = (ByteBuffer) message;
rb.mark();
- byteCount = rb.remaining();
ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
wb.put(rb);
wb.flip();
@@ -197,12 +188,9 @@
messageCopy = wb;
}
- s.increaseWrittenBytes(byteCount);
- s.increaseWrittenMessages();
-
s.getRemoteSession().getFilterChain().fireMessageReceived(
- s.getRemoteSession(), messageCopy);
- s.getFilterChain().fireMessageSent(s, req);
+ messageCopy);
+ s.getFilterChain().fireMessageSent(req);
}
} finally {
s.getLock().unlock();
@@ -240,15 +228,12 @@
List<Object> data = new ArrayList<Object>();
s.receivedMessageQueue.drainTo(data);
for (Object aData : data) {
- // TODO Optimize inefficient data transfer.
- // Data will be returned to pendingDataQueue
- // if getTraffic().isReadable() is false.
- VmPipeFilterChain.this.fireMessageReceived(s, aData);
+ VmPipeFilterChain.this.fireMessageReceived(aData);
}
}
if (s.getTrafficMask().isWritable()) {
- flush(s, null); // The second parameter is unused.
+ flush(s); // The second parameter is unused.
}
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoFilterChainTest.java Sun Sep 16 23:43:04 2007
@@ -196,13 +196,13 @@
}
private void run(String expectedResult) {
- chain.fireSessionCreated(session);
- chain.fireSessionOpened(session);
- chain.fireMessageReceived(session, new Object());
- chain.fireFilterWrite(session, new DefaultWriteRequest(new Object()));
- chain.fireSessionIdle(session, IdleStatus.READER_IDLE);
- chain.fireExceptionCaught(session, new Exception());
- chain.fireSessionClosed(session);
+ chain.fireSessionCreated();
+ chain.fireSessionOpened();
+ chain.fireMessageReceived(new Object());
+ chain.fireFilterWrite(new DefaultWriteRequest(new Object()));
+ chain.fireSessionIdle(IdleStatus.READER_IDLE);
+ chain.fireExceptionCaught(new Exception());
+ chain.fireSessionClosed();
result = formatResult(result);
expectedResult = formatResult(expectedResult);
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/IoServiceListenerSupportTest.java Sun Sep 16 23:43:04 2007
@@ -93,8 +93,8 @@
// Test creation
listener.sessionCreated(session);
- chain.fireSessionCreated(session);
- chain.fireSessionOpened(session);
+ chain.fireSessionCreated();
+ chain.fireSessionOpened();
listenerControl.replay();
chainControl.replay();
@@ -111,7 +111,7 @@
// Test destruction & other side effects
listenerControl.reset();
chainControl.reset();
- chain.fireSessionClosed(session);
+ chain.fireSessionClosed();
listener.sessionDestroyed(session);
listenerControl.replay();
@@ -154,8 +154,8 @@
// Activate a service and create a session.
listener.serviceActivated(acceptor);
listener.sessionCreated(session);
- chain.fireSessionCreated(session);
- chain.fireSessionOpened(session);
+ chain.fireSessionCreated();
+ chain.fireSessionOpened();
listenerControl.replay();
chainControl.replay();
@@ -174,8 +174,8 @@
listener.serviceDeactivated(acceptor);
acceptorControl.expectAndReturn(acceptor.isDisconnectOnUnbind(), true);
listener.sessionDestroyed(session);
- chain.fireFilterClose(session);
- chain.fireSessionClosed(session);
+ chain.fireFilterClose();
+ chain.fireSessionClosed();
listenerControl.replay();
acceptorControl.replay();
@@ -229,8 +229,8 @@
// Creating a session should activate a service automatically.
listener.serviceActivated(connector);
listener.sessionCreated(session);
- chain.fireSessionCreated(session);
- chain.fireSessionOpened(session);
+ chain.fireSessionCreated();
+ chain.fireSessionOpened();
listenerControl.replay();
chainControl.replay();
@@ -245,7 +245,7 @@
listenerControl.reset();
chainControl.reset();
listener.sessionDestroyed(session);
- chain.fireSessionClosed(session);
+ chain.fireSessionClosed();
listener.serviceDeactivated(connector);
listenerControl.replay();
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/AbstractBindTest.java Sun Sep 16 23:43:04 2007
@@ -21,14 +21,17 @@
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.Collection;
import java.util.Date;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.socket.nio.DatagramAcceptor;
@@ -56,6 +59,8 @@
protected abstract SocketAddress createSocketAddress(int port);
protected abstract int getPort(SocketAddress address);
+
+ protected abstract IoConnector newConnector();
protected void bind(boolean reuseAddress) throws IOException {
acceptor.setHandler(new EchoProtocolHandler());
@@ -146,6 +151,36 @@
for (int i = 0; i < 1024; i++) {
acceptor.unbind();
acceptor.bind();
+ }
+ }
+
+ public void testUnbindDisconnectsClients() throws Exception {
+ bind(true);
+ IoConnector connector = newConnector();
+ IoSession[] sessions = new IoSession[5];
+ connector.setHandler(new IoHandlerAdapter());
+ for (int i = 0; i < sessions.length; i++) {
+ ConnectFuture future = connector.connect(createSocketAddress(port));
+ future.awaitUninterruptibly();
+ sessions[i] = future.getSession();
+ Assert.assertTrue(sessions[i].isConnected());
+ Assert.assertTrue(sessions[i].write(ByteBuffer.allocate(1)).awaitUninterruptibly().isWritten());
+ }
+
+ // Wait for the server side sessions to be created.
+ Thread.sleep(500);
+
+ Collection<IoSession> managedSessions = acceptor.getManagedSessions();
+ Assert.assertEquals(5, managedSessions.size());
+
+ acceptor.unbind();
+
+ // Wait for the client side sessions to close.
+ Thread.sleep(500);
+
+ Assert.assertEquals(0, managedSessions.size());
+ for (IoSession element : managedSessions) {
+ Assert.assertFalse(element.isConnected());
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/DatagramBindTest.java Sun Sep 16 23:43:04 2007
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.transport.AbstractBindTest;
/**
@@ -38,11 +39,16 @@
@Override
protected SocketAddress createSocketAddress(int port) {
- return new InetSocketAddress(port);
+ return new InetSocketAddress("localhost", port);
}
@Override
protected int getPort(SocketAddress address) {
return ((InetSocketAddress) address).getPort();
+ }
+
+ @Override
+ protected IoConnector newConnector() {
+ return new DatagramConnector();
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/socket/nio/SocketBindTest.java Sun Sep 16 23:43:04 2007
@@ -21,14 +21,8 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Collection;
-import junit.framework.Assert;
-
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.AbstractBindTest;
/**
@@ -45,42 +39,16 @@
@Override
protected SocketAddress createSocketAddress(int port) {
- return new InetSocketAddress(port);
+ return new InetSocketAddress("localhost", port);
}
@Override
protected int getPort(SocketAddress address) {
return ((InetSocketAddress) address).getPort();
}
-
- public void testUnbindDisconnectsClients() throws Exception {
- // TODO: This test is almost identical to the test with the same name in VmPipeBindTest
- bind(false);
-
- IoConnector connector = new SocketConnector();
- IoSession[] sessions = new IoSession[5];
- connector.setHandler(new IoHandlerAdapter());
- for (int i = 0; i < sessions.length; i++) {
- ConnectFuture future = connector.connect(new InetSocketAddress(
- "localhost", port));
- future.awaitUninterruptibly();
- sessions[i] = future.getSession();
- Assert.assertTrue(sessions[i].isConnected());
- }
-
- // Wait for the server side sessions to be created.
- Thread.sleep(500);
-
- Collection<IoSession> managedSessions = acceptor.getManagedSessions();
- Assert.assertEquals(5, managedSessions.size());
-
- acceptor.unbind();
-
- // Wait for the client side sessions to close.
- Thread.sleep(500);
-
- for (IoSession element : sessions) {
- Assert.assertFalse(element.isConnected());
- }
+
+ @Override
+ protected IoConnector newConnector() {
+ return new SocketConnector();
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeBindTest.java Sun Sep 16 23:43:04 2007
@@ -20,14 +20,8 @@
package org.apache.mina.transport.vmpipe;
import java.net.SocketAddress;
-import java.util.Collection;
-import junit.framework.Assert;
-
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.AbstractBindTest;
/**
@@ -52,38 +46,8 @@
return ((VmPipeAddress) address).getPort();
}
- public void testUnbindDisconnectsClients() throws Exception {
- // TODO: This test is almost identical to the test with the same name in SocketBindTest
- bind(false);
-
- SocketAddress addr = createSocketAddress(port);
-
- IoConnector connector = new VmPipeConnector();
- connector.setHandler(new IoHandlerAdapter());
- IoSession[] sessions = new IoSession[5];
- for (int i = 0; i < sessions.length; i++) {
- ConnectFuture future = connector.connect(addr);
- future.awaitUninterruptibly();
- sessions[i] = future.getSession();
- Assert.assertTrue(sessions[i].isConnected());
- }
-
- // Wait for the server side sessions to be created.
- Thread.sleep(500);
-
- Collection<IoSession> managedSessions = acceptor.getManagedSessions();
- Assert.assertEquals(5, managedSessions.size());
- for (IoSession element : sessions) {
- Assert.assertFalse(managedSessions.contains(element));
- }
-
- acceptor.unbind();
-
- // Wait for the client side sessions to close.
- Thread.sleep(500);
-
- for (IoSession element : sessions) {
- Assert.assertFalse(element.isConnected());
- }
+ @Override
+ protected IoConnector newConnector() {
+ return new VmPipeConnector();
}
}
Modified: mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java?rev=576271&r1=576270&r2=576271&view=diff
==============================================================================
--- mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java (original)
+++ mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java Sun Sep 16 23:43:04 2007
@@ -168,11 +168,9 @@
ByteBuffer buf = (ByteBuffer) req.getMessage();
if (buf.remaining() == 0) {
getWriteRequestQueue().poll();
- this.increaseWrittenMessages();
-
buf.reset();
- this.getFilterChain().fireMessageSent(this, req);
+ this.getFilterChain().fireMessageSent(req);
continue;
}
@@ -180,9 +178,8 @@
try {
outputStream.write(buf.array());
buf.position(buf.position() + writtenBytes);
- this.increaseWrittenBytes(writtenBytes);
} catch (IOException e) {
- this.getFilterChain().fireExceptionCaught(this, e);
+ this.getFilterChain().fireExceptionCaught(e);
}
}
}
@@ -209,17 +206,16 @@
int readBytes = inputStream.read(data);
if (readBytes > 0) {
- increaseReadBytes(readBytes);
ByteBuffer buf = ByteBuffer
.wrap(data, 0, readBytes);
buf.put(data, 0, readBytes);
buf.flip();
getFilterChain().fireMessageReceived(
- SerialSessionImpl.this, buf);
+ buf);
}
} catch (IOException e) {
getFilterChain().fireExceptionCaught(
- SerialSessionImpl.this, e);
+ e);
}
}
}
@@ -242,7 +238,7 @@
public void add(IoSession session) {
}
- public void flush(IoSession session, WriteRequest writeRequest) {
+ public void flush(IoSession session) {
if (writeWorker == null) {
writeWorker = new WriteWorker();
writeWorker.start();
@@ -266,7 +262,7 @@
}
port.close();
- flush(session, null);
+ flush(session);
synchronized (readReadyMonitor) {
readReadyMonitor.notifyAll();
}