You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/28 16:30:20 UTC
svn commit: r1711035 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport:
NonBlockingConnection.java SelectorThread.java
Author: lquack
Date: Wed Oct 28 15:30:19 2015
New Revision: 1711035
URL: http://svn.apache.org/viewvc?rev=1711035&view=rev
Log:
QPID-6788: [Java Broker] NonBlockingConnections now only read and process work after all buffered data has been written to the network.
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1711035&r1=1711034&r2=1711035&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Oct 28 15:30:19 2015
@@ -199,10 +199,10 @@ public class NonBlockingConnection imple
public boolean canRead()
{
- return true;
+ return _fullyWritten;
}
- public boolean waitingForWrite()
+ public boolean canWrite()
{
return !_fullyWritten;
}
@@ -228,20 +228,28 @@ public class NonBlockingConnection imple
_protocolEngine.setMessageAssignmentSuspended(true);
- _protocolEngine.processPending();
-
- _protocolEngine.setTransportBlockedForWriting(!doWrite());
- boolean dataRead = doRead();
- _fullyWritten = doWrite();
- _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
-
- if (dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
+ if (!_fullyWritten)
{
- _protocolEngine.notifyWork();
+ _fullyWritten = doWrite();
}
- // tell all consumer targets that it is okay to accept more
- _protocolEngine.setMessageAssignmentSuspended(false);
+ if (_fullyWritten)
+ {
+ _protocolEngine.processPending();
+
+ _protocolEngine.setTransportBlockedForWriting(!doWrite());
+ boolean dataRead = doRead();
+ _fullyWritten = doWrite();
+ _protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
+
+ if (dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
+ {
+ _protocolEngine.notifyWork();
+ }
+
+ // tell all consumer targets that it is okay to accept more
+ _protocolEngine.setMessageAssignmentSuspended(false);
+ }
}
catch (IOException | ConnectionScopedRuntimeException e)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1711035&r1=1711034&r2=1711035&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Wed Oct 28 15:30:19 2015
@@ -208,7 +208,7 @@ class SelectorThread extends Thread
final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ | (unregisteredConnection.canWrite() ? SelectionKey.OP_WRITE : 0);
try
{
unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
@@ -460,11 +460,9 @@ class SelectorThread extends Thread
if(selectionTask != null)
{
final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
- int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
- if (connection.waitingForWrite())
- {
- expectedOps |= SelectionKey.OP_WRITE;
- }
+ int expectedOps = (connection.canRead() ? SelectionKey.OP_READ : 0)
+ | (connection.canWrite() ? SelectionKey.OP_WRITE : 0);
+
try
{
return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org