You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2012/11/10 09:20:42 UTC
svn commit: r1407731 - in
/mina/mina/trunk/core/src/main/java/org/apache/mina: api/ session/
transport/nio/ transport/tcp/
Author: elecharny
Date: Sat Nov 10 08:20:40 2012
New Revision: 1407731
URL: http://svn.apache.org/viewvc?rev=1407731&view=rev
Log:
o Removed the synchronization around the writeQueue
o Split the ready() method into a procesRead() and processWrite() methods
o Added some Javadoc and comments
Modified:
mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java Sat Nov 10 08:20:40 2012
@@ -24,13 +24,12 @@ import java.net.SocketAddress;
/**
*
- * A network serer bound to a local addresse
+ * A network server bound to a local address
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
*/
public interface IoServer extends IoService {
-
/**
* Returns the local addresses which are bound currently.
*/
@@ -58,5 +57,4 @@ public interface IoServer extends IoServ
* @throws IOException if failed to unbind
*/
void unbind() throws IOException;
-
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Sat Nov 10 08:20:40 2012
@@ -378,27 +378,9 @@ public interface IoSession {
public WriteRequest enqueueWriteRequest(Object message);
/**
- * Get the {@link Queue} of this session. The write queue contains the pending writes. This
- * method will lock the WriteQueue using the WriteLock lock. The {@link releaseWriteQueue()
- * method must be called when finished : <br/>
- * <code>
- * try {
- * Queue<WriteRequest> queue = session.acquireWriteQueue();
- * ...
- * // We use the queue here
- * ...
- * } finally {
- * session.releaseWriteQueue();
- * }
- * <code>
+ * Get the {@link Queue} of this session. The write queue contains the pending writes.
*
* @return the write queue of this session
*/
- public Queue<WriteRequest> acquireWriteQueue();
-
- /**
- * Release the WriteQueue after having acquired it with the {@link acquireWriteQeuee()} method.
- */
- public void releaseWriteQueue();
-
+ public Queue<WriteRequest> getWriteQueue();
}
\ No newline at end of file
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Sat Nov 10 08:20:40 2012
@@ -118,15 +118,6 @@ public abstract class AbstractIoSession
/** the queue of pending writes for the session, to be dequeued by the {@link SelectorProcessor} */
private final Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
- /** A lock to protect the access to the write queue */
- private final ReadWriteLock writeQueueLock = new ReentrantReadWriteLock();
-
- /** A Read lock on the reentrant writeQueue lock */
- private final Lock writeQueueReadLock = writeQueueLock.readLock();
-
- /** A Write lock on the reentrant writeQueue lock */
- private final Lock writeQueueWriteLock = writeQueueLock.writeLock();
-
// ------------------------------------------------------------------------
// Filter chain
// ------------------------------------------------------------------------
@@ -533,27 +524,20 @@ public abstract class AbstractIoSession
public WriteRequest enqueueWriteRequest(final Object message) {
WriteRequest request = null;
- try {
- // Lock the queue while the message is written into it
- writeQueueReadLock.lock();
-
- if (isConnectedSecured()) {
- // SSL/TLS : we have to encrypt the message
- final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+ if (isConnectedSecured()) {
+ // SSL/TLS : we have to encrypt the message
+ final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
+ if (sslHelper == null) {
+ throw new IllegalStateException();
+ }
- request = sslHelper.processWrite(this, message, writeQueue);
- } else {
- // Plain message
- request = new DefaultWriteRequest(message);
+ request = sslHelper.processWrite(this, message, writeQueue);
+ } else {
+ // Plain message
+ request = new DefaultWriteRequest(message);
- writeQueue.add(request);
- }
- } finally {
- writeQueueReadLock.unlock();
+ writeQueue.add(request);
}
// If it wasn't, we register this session as interested to write.
@@ -575,19 +559,10 @@ public abstract class AbstractIoSession
* {@inheritDoc}
*/
@Override
- public Queue<WriteRequest> acquireWriteQueue() {
- writeQueueWriteLock.lock();
+ public Queue<WriteRequest> getWriteQueue() {
return writeQueue;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void releaseWriteQueue() {
- writeQueueWriteLock.unlock();
- }
-
// ------------------------------------------------------------------------
// Close session management
// ------------------------------------------------------------------------
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Sat Nov 10 08:20:40 2012
@@ -34,23 +34,35 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * This class holds a Selector and handle all the incoming events for the
+ * sessions registered on this selector.ALl the events will be processed
+ * by some dedicated thread, taken from a pool.
+ * It will loop forever, untill the instance is stopped.
+ *
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class NioSelectorLoop implements SelectorLoop {
-
+ /** The logger for this class */
private final Logger logger;
/** the selector managed by this class */
private Selector selector;
- /** the worker thread in charge of polling the selector */
+ /** the worker thread in charge of processing the events */
private final SelectorWorker worker;
/** Read buffer for all the incoming bytes (default to 64Kb) */
private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
+ /** The queue containing the channels to register on the selector */
private final Queue<Registration> registrationQueue = new ConcurrentLinkedQueue<Registration>();
+ /**
+ * Creates an instance of the SelectorLoop.
+ *
+ * @param prefix
+ * @param index
+ */
public NioSelectorLoop(final String prefix, final int index) {
logger = LoggerFactory.getLogger(NioSelectorLoop.class.getName() + ":" + prefix + "-" + index);
worker = new SelectorWorker(prefix, index);
@@ -74,20 +86,25 @@ public class NioSelectorLoop implements
public void register(final boolean accept, final boolean read, final boolean write,
final SelectorListener listener, final SelectableChannel channel) {
logger.debug("registering : {} for accept : {}, read : {}, write : {}", new Object[] { listener, accept, read,
- write });
+ write });
int ops = 0;
+
if (accept) {
ops |= SelectionKey.OP_ACCEPT;
}
+
if (read) {
ops |= SelectionKey.OP_READ;
}
+
if (write) {
ops |= SelectionKey.OP_WRITE;
}
// TODO : if it's the same selector/worker, we don't need to do that we could directly enqueue
registrationQueue.add(new Registration(ops, channel, listener));
+
+ // Now, wakeup the selector in order to let it update the selectionKey status
selector.wakeup();
}
@@ -98,7 +115,7 @@ public class NioSelectorLoop implements
public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
final SelectorListener listener, final SelectableChannel channel) {
logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
- accept, read, write });
+ accept, read, write });
final SelectionKey key = channel.keyFor(selector);
if (key == null) {
@@ -137,8 +154,8 @@ public class NioSelectorLoop implements
}
/**
- * The worker processing incoming session creation, session destruction requests, session write and reads. It will
- * also bind new servers.
+ * The worker processing incoming session creation, session destruction requests,
+ * session write and reads. It will also bind new servers.
*/
private class SelectorWorker extends Thread {
@@ -156,6 +173,7 @@ public class NioSelectorLoop implements
final int readyCount = selector.select();
logger.debug("... done selecting : {} events", readyCount);
final Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+
while (it.hasNext()) {
final SelectionKey key = it.next();
final SelectorListener listener = (SelectorListener) key.attachment();
@@ -193,7 +211,9 @@ public class NioSelectorLoop implements
}
private final int ops;
+
private final SelectableChannel channel;
+
private final SelectorListener listener;
}
}
\ No newline at end of file
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Sat Nov 10 08:20:40 2012
@@ -41,9 +41,10 @@ import org.slf4j.LoggerFactory;
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
+ /** A logger for this class */
static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
- // the bound local address
+ /** the bound local address */
private SocketAddress address = null;
private final SelectorLoop acceptSelectorLoop;
@@ -190,6 +191,7 @@ public class NioTcpServer extends Abstra
LOG.error("error while accepting new client", e);
}
}
+
if (read || write) {
throw new IllegalStateException("should not receive read or write events");
}
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Sat Nov 10 08:20:40 2012
@@ -206,126 +206,156 @@ public class NioTcpSession extends Abstr
}
/**
- * {@inheritDoc}
+ * Process a read operation : read the data from the channel and push
+ * them to the chain.
+ * @param readBuffer The buffer that will contain the read data
*/
- @Override
- public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
- if (read) {
- try {
+ private void processRead(final ByteBuffer readBuffer) {
+ try {
+ LOG.debug("readable session : {}", this);
+
+ // First reset the buffer from what it contained before
+ readBuffer.clear();
+
+ // Read everything we can up to the buffer size
+ final int readCount = channel.read(readBuffer);
+
+ LOG.debug("read {} bytes", readCount);
+
+ if (readCount < 0) {
+ // session closed by the remote peer
+ LOG.debug("session closed by the remote peer");
+ close(true);
+ } else if (readCount > 0) {
+ // we have read some data
+ // limit at the current position & rewind buffer back to start &
+ // push to the chain
+ readBuffer.flip();
+
+ if (isSecured()) {
+ // We are reading data over a SSL/TLS encrypted connection.
+ // Redirect the processing to the SslHelper class.
+ final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
- LOG.debug("readable session : {}", this);
- readBuffer.clear();
- final int readCount = channel.read(readBuffer);
-
- LOG.debug("read {} bytes", readCount);
-
- if (readCount < 0) {
- // session closed by the remote peer
- LOG.debug("session closed by the remote peer");
- close(true);
- } else {
- // we have read some data
- // limit at the current position & rewind buffer back to start &
- // push to the chain
- readBuffer.flip();
-
- if (isSecured()) {
- // We are reading data over a SSL/TLS encrypted connection.
- // Redirect
- // the processing to the SslHelper class.
- final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
-
- if (sslHelper == null) {
- throw new IllegalStateException();
- }
-
- sslHelper.processRead(this, readBuffer);
- } else {
- // Plain message, not encrypted : go directly to the chain
- processMessageReceived(readBuffer);
+ if (sslHelper == null) {
+ throw new IllegalStateException();
}
- idleChecker.sessionRead(this, System.currentTimeMillis());
+ sslHelper.processRead(this, readBuffer);
+ } else {
+ // Plain message, not encrypted : go directly to the chain
+ processMessageReceived(readBuffer);
}
- } catch (final IOException e) {
- LOG.error("Exception while reading : ", e);
- }
+ // Update the session idle status
+ idleChecker.sessionRead(this, System.currentTimeMillis());
+ }
+ } catch (final IOException e) {
+ LOG.error("Exception while reading : ", e);
}
- if (write) {
- try {
- LOG.debug("ready for write");
- LOG.debug("writable session : {}", this);
-
- setNotRegisteredForWrite();
-
- // write from the session write queue
- boolean isEmpty = false;
-
- try {
- final Queue<WriteRequest> queue = acquireWriteQueue();
-
- do {
- // get a write request from the queue
- final WriteRequest wreq = queue.peek();
-
- if (wreq == null) {
- break;
- }
-
- final ByteBuffer buf = (ByteBuffer) wreq.getMessage();
-
- // Note that if the connection is secured, the buffer
- // already
- // contains encrypted data.
- final int wrote = getSocketChannel().write(buf);
- incrementWrittenBytes(wrote);
- LOG.debug("wrote {} bytes to {}", wrote, this);
-
- idleChecker.sessionWritten(this, System.currentTimeMillis());
-
- if (buf.remaining() == 0) {
- // completed write request, let's remove it
- queue.remove();
- // complete the future
- final DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
-
- if (future != null) {
- future.complete();
- }
- // generate the message sent event
- final Object highLevel = ((DefaultWriteRequest) wreq).getHighLevelMessage();
- if (highLevel != null) {
- processMessageSent(highLevel);
- }
- } else {
- // output socket buffer is full, we need
- // to give up until next selection for
- // writing
- break;
- }
- } while (!queue.isEmpty());
-
- isEmpty = queue.isEmpty();
- } finally {
- this.releaseWriteQueue();
+ }
+
+ /**
+ * Process a write operation. This will be executed only because the session
+ * has something to write into the channel.
+ */
+ private void processWrite() {
+ try {
+ LOG.debug("ready for write");
+ LOG.debug("writable session : {}", this);
+
+ Queue<WriteRequest> queue = getWriteQueue();
+
+ do {
+ // get a write request from the queue. We left it in the queue,
+ // just in case we can't write all of the message content into
+ // the channel : we will have to retrieve the message later
+ final WriteRequest writeRequest = queue.peek();
+
+ if (writeRequest == null) {
+ // Nothing to write : we are done
+ break;
+ }
+
+ // The message is necessarily a ByteBuffer at this point
+ final ByteBuffer buf = (ByteBuffer) writeRequest.getMessage();
+
+ // Note that if the connection is secured, the buffer
+ // already contains encrypted data.
+
+ // Try to write the data, and get back the number of bytes
+ // actually written
+ final int written = channel.write(buf);
+ LOG.debug("wrote {} bytes to {}", written, this);
+
+ if (written > 0) {
+ incrementWrittenBytes(written);
}
- // if the session is no more interested in writing, we need
- // to stop listening for OP_WRITE events
- if (isEmpty) {
- if (isClosing()) {
- LOG.debug("closing session {} have empty write queue, so we close it", this);
- // we was flushing writes, now we to the close
- channelClose();
- } else {
- // no more write event needed
- selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+ // Update the idle status for this session
+ idleChecker.sessionWritten(this, System.currentTimeMillis());
+
+ // Ok, we may not have written everything. Check that.
+ if (buf.remaining() == 0) {
+ // completed write request, let's remove it (we use poll() instead
+ // of remove(), because remove() may throw an exception if the
+ // queue is empty.
+ queue.poll();
+
+ // complete the future if we have one (we should...)
+ final DefaultWriteFuture future = (DefaultWriteFuture) writeRequest.getFuture();
+
+ if (future != null) {
+ future.complete();
}
+
+ // generate the message sent event
+ final Object highLevel = ((DefaultWriteRequest) writeRequest).getHighLevelMessage();
+
+ if (highLevel != null) {
+ processMessageSent(highLevel);
+ }
+ } else {
+ // output socket buffer is full, we need
+ // to give up until next selection for
+ // writing.
+ break;
}
- } catch (final IOException e) {
- LOG.error("Exception while reading : ", e);
+ } while (!queue.isEmpty());
+
+ // We may have exited from the loop for some other reason
+ // that an empty queue
+ // if the session is no more interested in writing, we need
+ // to stop listening for OP_WRITE events
+ if (queue.isEmpty()) {
+ if (isClosing()) {
+ LOG.debug("closing session {} have empty write queue, so we close it", this);
+ // we was flushing writes, now we to the close
+ channelClose();
+ } else {
+ // no more write event needed
+ selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+ }
+ } else {
+ // We have some more data to write : the channel OP_WRITE interest remains
+ // as it was.
}
+ } catch (final IOException e) {
+ LOG.error("Exception while reading : ", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+ if (read) {
+ processRead(readBuffer);
+ }
+
+ if (write) {
+ processWrite();
}
if (accept) {
throw new IllegalStateException("accept event should never occur on NioTcpSession");
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Sat Nov 10 08:20:40 2012
@@ -24,7 +24,16 @@ import java.nio.channels.SelectableChann
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public interface SelectorLoop {
-
+ /**
+ * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
+ * OP_READ or OP_WRITE.
+ *
+ * @param accept Registers for OP_ACCEPT events
+ * @param read Registers for OP_READ events
+ * @param write Registers for OP_WRITE events
+ * @param listener The listener
+ * @param channel
+ */
public abstract void register(boolean accept, boolean read, boolean write, SelectorListener listener,
SelectableChannel channel);
Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java Sat Nov 10 08:20:40 2012
@@ -27,7 +27,7 @@ import org.apache.mina.service.server.Ab
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
public abstract class AbstractTcpServer extends AbstractIoServer {
- // the default session configuration
+ /** the default session configuration */
private TcpSessionConfig config;
/**
@@ -53,5 +53,4 @@ public abstract class AbstractTcpServer
public void setSessionConfig(final TcpSessionConfig config) {
this.config = config;
}
-
}
\ No newline at end of file