You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2012/11/10 09:20:42 UTC

svn commit: r1407731 - in /mina/mina/trunk/core/src/main/java/org/apache/mina: api/ session/ transport/nio/ transport/tcp/

Author: elecharny
Date: Sat Nov 10 08:20:40 2012
New Revision: 1407731

URL: http://svn.apache.org/viewvc?rev=1407731&view=rev
Log:
o Removed the synchronization around the writeQueue
o Split the ready() method into a procesRead() and processWrite() methods
o Added some Javadoc and comments

Modified:
    mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoServer.java Sat Nov 10 08:20:40 2012
@@ -24,13 +24,12 @@ import java.net.SocketAddress;
 
 /**
  * 
- * A network serer bound to a local addresse
+ * A network server bound to a local address
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  * 
  */
 public interface IoServer extends IoService {
-
     /**
      * Returns the local addresses which are bound currently.
      */
@@ -58,5 +57,4 @@ public interface IoServer extends IoServ
      * @throws IOException if failed to unbind
      */
     void unbind() throws IOException;
-
 }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Sat Nov 10 08:20:40 2012
@@ -378,27 +378,9 @@ public interface IoSession {
     public WriteRequest enqueueWriteRequest(Object message);
 
     /**
-     * Get the {@link Queue} of this session. The write queue contains the pending writes. This
-     * method will lock the WriteQueue using the WriteLock lock. The {@link releaseWriteQueue()
-     * method must be called when finished : <br/>
-     * <code>
-     *   try {
-     *       Queue<WriteRequest> queue = session.acquireWriteQueue();
-     *       ...
-     *       // We use the queue here
-     *       ...
-     *   } finally {
-     *       session.releaseWriteQueue();
-     *   }
-     * <code>
+     * Get the {@link Queue} of this session. The write queue contains the pending writes. 
      * 
      * @return the write queue of this session
      */
-    public Queue<WriteRequest> acquireWriteQueue();
-
-    /**
-     * Release the WriteQueue after having acquired it with the {@link acquireWriteQeuee()} method.
-     */
-    public void releaseWriteQueue();
-
+    public Queue<WriteRequest> getWriteQueue();
 }
\ No newline at end of file

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Sat Nov 10 08:20:40 2012
@@ -118,15 +118,6 @@ public abstract class AbstractIoSession 
     /** the queue of pending writes for the session, to be dequeued by the {@link SelectorProcessor} */
     private final Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
 
-    /** A lock to protect the access to the write queue */
-    private final ReadWriteLock writeQueueLock = new ReentrantReadWriteLock();
-
-    /** A Read lock on the reentrant writeQueue lock */
-    private final Lock writeQueueReadLock = writeQueueLock.readLock();
-
-    /** A Write lock on the reentrant writeQueue lock */
-    private final Lock writeQueueWriteLock = writeQueueLock.writeLock();
-
     // ------------------------------------------------------------------------
     // Filter chain
     // ------------------------------------------------------------------------
@@ -533,27 +524,20 @@ public abstract class AbstractIoSession 
     public WriteRequest enqueueWriteRequest(final Object message) {
         WriteRequest request = null;
 
-        try {
-            // Lock the queue while the message is written into it
-            writeQueueReadLock.lock();
-
-            if (isConnectedSecured()) {
-                // SSL/TLS : we have to encrypt the message
-                final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+        if (isConnectedSecured()) {
+            // SSL/TLS : we have to encrypt the message
+            final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
 
-                if (sslHelper == null) {
-                    throw new IllegalStateException();
-                }
+            if (sslHelper == null) {
+                throw new IllegalStateException();
+            }
 
-                request = sslHelper.processWrite(this, message, writeQueue);
-            } else {
-                // Plain message
-                request = new DefaultWriteRequest(message);
+            request = sslHelper.processWrite(this, message, writeQueue);
+        } else {
+            // Plain message
+            request = new DefaultWriteRequest(message);
 
-                writeQueue.add(request);
-            }
-        } finally {
-            writeQueueReadLock.unlock();
+            writeQueue.add(request);
         }
 
         // If it wasn't, we register this session as interested to write.
@@ -575,19 +559,10 @@ public abstract class AbstractIoSession 
      * {@inheritDoc}
      */
     @Override
-    public Queue<WriteRequest> acquireWriteQueue() {
-        writeQueueWriteLock.lock();
+    public Queue<WriteRequest> getWriteQueue() {
         return writeQueue;
     }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void releaseWriteQueue() {
-        writeQueueWriteLock.unlock();
-    }
-
     // ------------------------------------------------------------------------
     // Close session management
     // ------------------------------------------------------------------------

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Sat Nov 10 08:20:40 2012
@@ -34,23 +34,35 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * This class holds a Selector and handle all the incoming events for the 
+ * sessions registered on this selector.ALl the events will be processed
+ * by some dedicated thread, taken from a pool.
+ * It will loop forever, untill the instance is stopped.
+ *  
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class NioSelectorLoop implements SelectorLoop {
-
+    /** The logger for this class */
     private final Logger logger;
 
     /** the selector managed by this class */
     private Selector selector;
 
-    /** the worker thread in charge of polling the selector */
+    /** the worker thread in charge of processing the events */
     private final SelectorWorker worker;
 
     /** Read buffer for all the incoming bytes (default to 64Kb) */
     private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
 
+    /** The queue containing the channels to register on the selector */
     private final Queue<Registration> registrationQueue = new ConcurrentLinkedQueue<Registration>();
 
+    /**
+     * Creates an instance of the SelectorLoop.
+     * 
+     * @param prefix 
+     * @param index
+     */
     public NioSelectorLoop(final String prefix, final int index) {
         logger = LoggerFactory.getLogger(NioSelectorLoop.class.getName() + ":" + prefix + "-" + index);
         worker = new SelectorWorker(prefix, index);
@@ -74,20 +86,25 @@ public class NioSelectorLoop implements 
     public void register(final boolean accept, final boolean read, final boolean write,
             final SelectorListener listener, final SelectableChannel channel) {
         logger.debug("registering : {} for accept : {}, read : {}, write : {}", new Object[] { listener, accept, read,
-                                write });
+                write });
         int ops = 0;
+
         if (accept) {
             ops |= SelectionKey.OP_ACCEPT;
         }
+
         if (read) {
             ops |= SelectionKey.OP_READ;
         }
+
         if (write) {
             ops |= SelectionKey.OP_WRITE;
         }
 
         // TODO : if it's the same selector/worker, we don't need to do that we could directly enqueue
         registrationQueue.add(new Registration(ops, channel, listener));
+
+        // Now, wakeup the selector in order to let it update the selectionKey status
         selector.wakeup();
     }
 
@@ -98,7 +115,7 @@ public class NioSelectorLoop implements 
     public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
             final SelectorListener listener, final SelectableChannel channel) {
         logger.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
-                                accept, read, write });
+                accept, read, write });
 
         final SelectionKey key = channel.keyFor(selector);
         if (key == null) {
@@ -137,8 +154,8 @@ public class NioSelectorLoop implements 
     }
 
     /**
-     * The worker processing incoming session creation, session destruction requests, session write and reads. It will
-     * also bind new servers.
+     * The worker processing incoming session creation, session destruction requests, 
+     * session write and reads. It will also bind new servers.
      */
     private class SelectorWorker extends Thread {
 
@@ -156,6 +173,7 @@ public class NioSelectorLoop implements 
                     final int readyCount = selector.select();
                     logger.debug("... done selecting : {} events", readyCount);
                     final Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+
                     while (it.hasNext()) {
                         final SelectionKey key = it.next();
                         final SelectorListener listener = (SelectorListener) key.attachment();
@@ -193,7 +211,9 @@ public class NioSelectorLoop implements 
         }
 
         private final int ops;
+
         private final SelectableChannel channel;
+
         private final SelectorListener listener;
     }
 }
\ No newline at end of file

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Sat Nov 10 08:20:40 2012
@@ -41,9 +41,10 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
+    /** A logger for this class */
     static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
 
-    // the bound local address
+    /** the bound local address */
     private SocketAddress address = null;
 
     private final SelectorLoop acceptSelectorLoop;
@@ -190,6 +191,7 @@ public class NioTcpServer extends Abstra
                 LOG.error("error while accepting new client", e);
             }
         }
+
         if (read || write) {
             throw new IllegalStateException("should not receive read or write events");
         }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Sat Nov 10 08:20:40 2012
@@ -206,126 +206,156 @@ public class NioTcpSession extends Abstr
     }
 
     /**
-     * {@inheritDoc}
+     * Process a read operation : read the data from the channel and push
+     * them to the chain.
+     * @param readBuffer The buffer that will contain the read data
      */
-    @Override
-    public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
-        if (read) {
-            try {
+    private void processRead(final ByteBuffer readBuffer) {
+        try {
+            LOG.debug("readable session : {}", this);
+
+            // First reset the buffer from what it contained before
+            readBuffer.clear();
+
+            // Read everything we can up to the buffer size
+            final int readCount = channel.read(readBuffer);
+
+            LOG.debug("read {} bytes", readCount);
+
+            if (readCount < 0) {
+                // session closed by the remote peer
+                LOG.debug("session closed by the remote peer");
+                close(true);
+            } else if (readCount > 0) {
+                // we have read some data
+                // limit at the current position & rewind buffer back to start &
+                // push to the chain
+                readBuffer.flip();
+
+                if (isSecured()) {
+                    // We are reading data over a SSL/TLS encrypted connection.
+                    // Redirect the processing to the SslHelper class.
+                    final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
 
-                LOG.debug("readable session : {}", this);
-                readBuffer.clear();
-                final int readCount = channel.read(readBuffer);
-
-                LOG.debug("read {} bytes", readCount);
-
-                if (readCount < 0) {
-                    // session closed by the remote peer
-                    LOG.debug("session closed by the remote peer");
-                    close(true);
-                } else {
-                    // we have read some data
-                    // limit at the current position & rewind buffer back to start &
-                    // push to the chain
-                    readBuffer.flip();
-
-                    if (isSecured()) {
-                        // We are reading data over a SSL/TLS encrypted connection.
-                        // Redirect
-                        // the processing to the SslHelper class.
-                        final SslHelper sslHelper = getAttribute(SSL_HELPER, null);
-
-                        if (sslHelper == null) {
-                            throw new IllegalStateException();
-                        }
-
-                        sslHelper.processRead(this, readBuffer);
-                    } else {
-                        // Plain message, not encrypted : go directly to the chain
-                        processMessageReceived(readBuffer);
+                    if (sslHelper == null) {
+                        throw new IllegalStateException();
                     }
 
-                    idleChecker.sessionRead(this, System.currentTimeMillis());
+                    sslHelper.processRead(this, readBuffer);
+                } else {
+                    // Plain message, not encrypted : go directly to the chain
+                    processMessageReceived(readBuffer);
                 }
-            } catch (final IOException e) {
-                LOG.error("Exception while reading : ", e);
-            }
 
+                // Update the session idle status
+                idleChecker.sessionRead(this, System.currentTimeMillis());
+            }
+        } catch (final IOException e) {
+            LOG.error("Exception while reading : ", e);
         }
-        if (write) {
-            try {
-                LOG.debug("ready for write");
-                LOG.debug("writable session : {}", this);
-
-                setNotRegisteredForWrite();
-
-                // write from the session write queue
-                boolean isEmpty = false;
-
-                try {
-                    final Queue<WriteRequest> queue = acquireWriteQueue();
-
-                    do {
-                        // get a write request from the queue
-                        final WriteRequest wreq = queue.peek();
-
-                        if (wreq == null) {
-                            break;
-                        }
-
-                        final ByteBuffer buf = (ByteBuffer) wreq.getMessage();
-
-                        // Note that if the connection is secured, the buffer
-                        // already
-                        // contains encrypted data.
-                        final int wrote = getSocketChannel().write(buf);
-                        incrementWrittenBytes(wrote);
-                        LOG.debug("wrote {} bytes to {}", wrote, this);
-
-                        idleChecker.sessionWritten(this, System.currentTimeMillis());
-
-                        if (buf.remaining() == 0) {
-                            // completed write request, let's remove it
-                            queue.remove();
-                            // complete the future
-                            final DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
-
-                            if (future != null) {
-                                future.complete();
-                            }
-                            // generate the message sent event
-                            final Object highLevel = ((DefaultWriteRequest) wreq).getHighLevelMessage();
-                            if (highLevel != null) {
-                                processMessageSent(highLevel);
-                            }
-                        } else {
-                            // output socket buffer is full, we need
-                            // to give up until next selection for
-                            // writing
-                            break;
-                        }
-                    } while (!queue.isEmpty());
-
-                    isEmpty = queue.isEmpty();
-                } finally {
-                    this.releaseWriteQueue();
+    }
+
+    /**
+     * Process a write operation. This will be executed only because the session
+     * has something to write into the channel.
+     */
+    private void processWrite() {
+        try {
+            LOG.debug("ready for write");
+            LOG.debug("writable session : {}", this);
+
+            Queue<WriteRequest> queue = getWriteQueue();
+
+            do {
+                // get a write request from the queue. We left it in the queue,
+                // just in case we can't write all of the message content into
+                // the channel : we will have to retrieve the message later
+                final WriteRequest writeRequest = queue.peek();
+
+                if (writeRequest == null) {
+                    // Nothing to write : we are done
+                    break;
+                }
+
+                // The message is necessarily a ByteBuffer at this point
+                final ByteBuffer buf = (ByteBuffer) writeRequest.getMessage();
+
+                // Note that if the connection is secured, the buffer
+                // already contains encrypted data.
+
+                // Try to write the data, and get back the number of bytes
+                // actually written
+                final int written = channel.write(buf);
+                LOG.debug("wrote {} bytes to {}", written, this);
+
+                if (written > 0) {
+                    incrementWrittenBytes(written);
                 }
 
-                // if the session is no more interested in writing, we need
-                // to stop listening for OP_WRITE events
-                if (isEmpty) {
-                    if (isClosing()) {
-                        LOG.debug("closing session {} have empty write queue, so we close it", this);
-                        // we was flushing writes, now we to the close
-                        channelClose();
-                    } else {
-                        // no more write event needed
-                        selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+                // Update the idle status for this session
+                idleChecker.sessionWritten(this, System.currentTimeMillis());
+
+                // Ok, we may not have written everything. Check that.
+                if (buf.remaining() == 0) {
+                    // completed write request, let's remove it (we use poll() instead
+                    // of remove(), because remove() may throw an exception if the
+                    // queue is empty.
+                    queue.poll();
+
+                    // complete the future if we have one (we should...)
+                    final DefaultWriteFuture future = (DefaultWriteFuture) writeRequest.getFuture();
+
+                    if (future != null) {
+                        future.complete();
                     }
+
+                    // generate the message sent event
+                    final Object highLevel = ((DefaultWriteRequest) writeRequest).getHighLevelMessage();
+
+                    if (highLevel != null) {
+                        processMessageSent(highLevel);
+                    }
+                } else {
+                    // output socket buffer is full, we need
+                    // to give up until next selection for
+                    // writing. 
+                    break;
                 }
-            } catch (final IOException e) {
-                LOG.error("Exception while reading : ", e);
+            } while (!queue.isEmpty());
+
+            // We may have exited from the loop for some other reason 
+            // that an empty queue
+            // if the session is no more interested in writing, we need
+            // to stop listening for OP_WRITE events
+            if (queue.isEmpty()) {
+                if (isClosing()) {
+                    LOG.debug("closing session {} have empty write queue, so we close it", this);
+                    // we was flushing writes, now we to the close
+                    channelClose();
+                } else {
+                    // no more write event needed
+                    selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+                }
+            } else {
+                // We have some more data to write : the channel OP_WRITE interest remains 
+                // as it was.
             }
+        } catch (final IOException e) {
+            LOG.error("Exception while reading : ", e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void ready(final boolean accept, final boolean read, final ByteBuffer readBuffer, final boolean write) {
+        if (read) {
+            processRead(readBuffer);
+        }
+
+        if (write) {
+            processWrite();
         }
         if (accept) {
             throw new IllegalStateException("accept event should never occur on NioTcpSession");

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Sat Nov 10 08:20:40 2012
@@ -24,7 +24,16 @@ import java.nio.channels.SelectableChann
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public interface SelectorLoop {
-
+    /**
+     * Register a channel on a Selector, for some events. We can register for OP_ACCEPT,
+     * OP_READ or OP_WRITE.
+     * 
+     * @param accept Registers for OP_ACCEPT events
+     * @param read Registers for OP_READ events
+     * @param write Registers for OP_WRITE events
+     * @param listener The listener
+     * @param channel
+     */
     public abstract void register(boolean accept, boolean read, boolean write, SelectorListener listener,
             SelectableChannel channel);
 

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java?rev=1407731&r1=1407730&r2=1407731&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/AbstractTcpServer.java Sat Nov 10 08:20:40 2012
@@ -27,7 +27,7 @@ import org.apache.mina.service.server.Ab
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public abstract class AbstractTcpServer extends AbstractIoServer {
-    // the default session configuration
+    /** the default session configuration */
     private TcpSessionConfig config;
 
     /**
@@ -53,5 +53,4 @@ public abstract class AbstractTcpServer 
     public void setSessionConfig(final TcpSessionConfig config) {
         this.config = config;
     }
-
 }
\ No newline at end of file