You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/28 16:30:20 UTC

svn commit: r1711035 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport: NonBlockingConnection.java SelectorThread.java

Author: lquack
Date: Wed Oct 28 15:30:19 2015
New Revision: 1711035

URL: http://svn.apache.org/viewvc?rev=1711035&view=rev
Log:
QPID-6788: [Java Broker] NonBlockingConnections now only read and process work after all buffered data has been written to the network.

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1711035&r1=1711034&r2=1711035&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Oct 28 15:30:19 2015
@@ -199,10 +199,10 @@ public class NonBlockingConnection imple
 
     public boolean canRead()
     {
-        return true;
+        return _fullyWritten;
     }
 
-    public boolean waitingForWrite()
+    public boolean canWrite()
     {
         return !_fullyWritten;
     }
@@ -228,20 +228,28 @@ public class NonBlockingConnection imple
 
                 _protocolEngine.setMessageAssignmentSuspended(true);
 
-                _protocolEngine.processPending();
-
-                _protocolEngine.setTransportBlockedForWriting(!doWrite());
-                boolean dataRead = doRead();
-                _fullyWritten = doWrite();
-                _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
-
-                if (dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
+                if (!_fullyWritten)
                 {
-                    _protocolEngine.notifyWork();
+                    _fullyWritten = doWrite();
                 }
 
-                // tell all consumer targets that it is okay to accept more
-                _protocolEngine.setMessageAssignmentSuspended(false);
+                if (_fullyWritten)
+                {
+                    _protocolEngine.processPending();
+
+                    _protocolEngine.setTransportBlockedForWriting(!doWrite());
+                    boolean dataRead = doRead();
+                    _fullyWritten = doWrite();
+                    _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+                    if (dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
+                    {
+                        _protocolEngine.notifyWork();
+                    }
+
+                    // tell all consumer targets that it is okay to accept more
+                    _protocolEngine.setMessageAssignmentSuspended(false);
+                }
             }
             catch (IOException | ConnectionScopedRuntimeException e)
             {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1711035&r1=1711034&r2=1711035&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Wed Oct 28 15:30:19 2015
@@ -208,7 +208,7 @@ class SelectorThread extends Thread
 
 
                 final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
-                                | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+                                | (unregisteredConnection.canWrite() ? SelectionKey.OP_WRITE : 0);
                 try
                 {
                     unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
@@ -460,11 +460,9 @@ class SelectorThread extends Thread
         if(selectionTask != null)
         {
             final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
-            int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
-            if (connection.waitingForWrite())
-            {
-                expectedOps |= SelectionKey.OP_WRITE;
-            }
+            int expectedOps = (connection.canRead() ? SelectionKey.OP_READ : 0)
+                              | (connection.canWrite() ? SelectionKey.OP_WRITE : 0);
+
             try
             {
                 return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org