You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2012/11/16 02:12:06 UTC

svn commit: r1410137 - in /mina/mina/trunk/core/src/main/java/org/apache/mina: session/AbstractIoSession.java session/SslHelper.java transport/nio/NioTcpSession.java

Author: elecharny
Date: Fri Nov 16 01:12:05 2012
New Revision: 1410137

URL: http://svn.apache.org/viewvc?rev=1410137&view=rev
Log:
o Synchronized the selector thread and the worker thread writeQueue so that we just set the OP_WRITE flag safely. 

Modified:
    mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
    mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1410137&r1=1410136&r2=1410137&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Fri Nov 16 01:12:05 2012
@@ -537,14 +537,16 @@ public abstract class AbstractIoSession 
         } else {
             // Plain message
             request = new DefaultWriteRequest(message);
+        }
 
+        synchronized (writeQueue) {
             writeQueue.add(request);
-        }
 
-        // If it wasn't, we register this session as interested to write.
-        // It's done in atomic fashion for avoiding two concurrent registering.
-        if (!registeredForWrite.getAndSet(true)) {
-            flushWriteQueue();
+            // If it wasn't, we register this session as interested to write.
+            // It's done in atomic fashion for avoiding two concurrent registering.
+            if (!registeredForWrite.getAndSet(true)) {
+                flushWriteQueue();
+            }
         }
 
         return request;

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java?rev=1410137&r1=1410136&r2=1410137&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java Fri Nov 16 01:12:05 2012
@@ -448,8 +448,6 @@ public class SslHelper {
                     appBuffer.flip();
                     WriteRequest request = new DefaultWriteRequest(appBuffer);
 
-                    writeQueue.add(request);
-
                     return request;
                 }
             }

Modified: mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1410137&r1=1410136&r2=1410137&view=diff
==============================================================================
--- mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Fri Nov 16 01:12:05 2012
@@ -264,13 +264,13 @@ public class NioTcpSession extends Abstr
             LOG.debug("ready for write");
             LOG.debug("writable session : {}", this);
 
-            Queue<WriteRequest> queue = getWriteQueue();
+            Queue<WriteRequest> writeQueue = getWriteQueue();
 
             do {
                 // get a write request from the queue. We left it in the queue,
                 // just in case we can't write all of the message content into
                 // the channel : we will have to retrieve the message later
-                final WriteRequest writeRequest = queue.peek();
+                final WriteRequest writeRequest = writeQueue.peek();
 
                 if (writeRequest == null) {
                     // Nothing to write : we are done
@@ -300,7 +300,7 @@ public class NioTcpSession extends Abstr
                     // completed write request, let's remove it (we use poll() instead
                     // of remove(), because remove() may throw an exception if the
                     // queue is empty.
-                    queue.poll();
+                    writeQueue.poll();
 
                     // complete the future if we have one (we should...)
                     final DefaultWriteFuture future = (DefaultWriteFuture) writeRequest.getFuture();
@@ -321,24 +321,29 @@ public class NioTcpSession extends Abstr
                     // writing. 
                     break;
                 }
-            } while (!queue.isEmpty());
+            } while (!writeQueue.isEmpty());
 
             // We may have exited from the loop for some other reason 
             // that an empty queue
             // if the session is no more interested in writing, we need
             // to stop listening for OP_WRITE events
-            if (queue.isEmpty()) {
-                if (isClosing()) {
-                    LOG.debug("closing session {} have empty write queue, so we close it", this);
-                    // we was flushing writes, now we to the close
-                    channelClose();
+            //
+            // IMPORTANT : this section is synchronized so that the OP_WRITE flag
+            // can be set safely by both the selector thread and the writer thread.
+            synchronized (writeQueue) {
+                if (writeQueue.isEmpty()) {
+                    if (isClosing()) {
+                        LOG.debug("closing session {} have empty write queue, so we close it", this);
+                        // we was flushing writes, now we to the close
+                        channelClose();
+                    } else {
+                        // no more write event needed
+                        selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+                    }
                 } else {
-                    // no more write event needed
-                    selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+                    // We have some more data to write : the channel OP_WRITE interest remains 
+                    // as it was.
                 }
-            } else {
-                // We have some more data to write : the channel OP_WRITE interest remains 
-                // as it was.
             }
         } catch (final IOException e) {
             LOG.error("Exception while reading : ", e);