You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/01/29 12:28:32 UTC
svn commit: r1655596 - in
/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io:
NonBlockingSenderReceiver.java SelectorThread.java
Author: rgodfrey
Date: Thu Jan 29 11:28:32 2015
New Revision: 1655596
URL: http://svn.apache.org/r1655596
Log:
Ensure selectors are closed, and all work which can be done on a read is done
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1655596&r1=1655595&r2=1655596&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Thu Jan 29 11:28:32 2015
@@ -21,8 +21,6 @@ package org.apache.qpid.transport.networ
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.util.ArrayList;
@@ -42,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
@@ -78,6 +75,7 @@ public class NonBlockingSenderReceiver
private SSLEngineResult _status;
private volatile boolean _fullyWritten = true;
private AtomicBoolean _stateChanged = new AtomicBoolean();
+ private boolean _workDone;
public NonBlockingSenderReceiver(final NonBlockingConnection connection,
@@ -152,12 +150,13 @@ public class NonBlockingSenderReceiver
public boolean doWork()
{
_stateChanged.set(false);
-
boolean closed = _closed.get();
if (!closed)
{
try
{
+ _workDone = false;
+
long currentTime = System.currentTimeMillis();
int tick = _ticker.getTimeToNextTick(currentTime);
if (tick <= 0)
@@ -170,6 +169,10 @@ public class NonBlockingSenderReceiver
_fullyWritten = doWrite();
_receiver.setTransportBlockedForWriting(!_fullyWritten);
+ if(_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)
+ {
+ _stateChanged.set(true);
+ }
}
catch (IOException e)
{
@@ -267,7 +270,6 @@ public class NonBlockingSenderReceiver
public void close()
{
LOGGER.debug("Closing " + _remoteSocketAddress);
-
_closed.set(true);
_stateChanged.set(true);
_connection.getSelector().wakeup();
@@ -310,11 +312,11 @@ public class NonBlockingSenderReceiver
else if(_transportEncryption == TransportEncryption.TLS)
{
int remaining = 0;
-
do
{
if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
{
+ _workDone = true;
final ByteBuffer netBuffer = ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
_status = _sslEngine.wrap(bufArray, netBuffer);
runSSLEngineTasks(_status);
@@ -337,14 +339,12 @@ public class NonBlockingSenderReceiver
}
while(remaining != 0 && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
-
ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]);
long written = _socketChannel.write(encryptedBuffers);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Written " + written + " encrypted bytes");
}
-
ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
while(iter.hasNext())
{
@@ -409,25 +409,27 @@ public class NonBlockingSenderReceiver
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Read " + read + " encrypted bytes " + _netInputBuffer);
+ LOGGER.debug("Read " + read + " encrypted bytes ");
}
+
_netInputBuffer.flip();
int unwrapped = 0;
+ boolean tasksRun;
do
{
ByteBuffer appInputBuffer =
ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
_status = _sslEngine.unwrap(_netInputBuffer, appInputBuffer);
- runSSLEngineTasks(_status);
+ tasksRun = runSSLEngineTasks(_status);
appInputBuffer.flip();
unwrapped = appInputBuffer.remaining();
_receiver.received(appInputBuffer);
}
- while(unwrapped > 0);
+ while(unwrapped > 0 || tasksRun);
_netInputBuffer.compact();
@@ -476,7 +478,7 @@ public class NonBlockingSenderReceiver
}
}
- private void runSSLEngineTasks(final SSLEngineResult status)
+ private boolean runSSLEngineTasks(final SSLEngineResult status)
{
if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
{
@@ -485,7 +487,9 @@ public class NonBlockingSenderReceiver
{
task.run();
}
+ return true;
}
+ return false;
}
private boolean looksLikeSSL(byte[] headerBytes)
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1655596&r1=1655595&r2=1655596&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java Thu Jan 29 11:28:32 2015
@@ -39,7 +39,7 @@ public class SelectorThread extends Thre
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
- private final Selector _selector;
+ private Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
@@ -62,86 +62,80 @@ public class SelectorThread extends Thre
{
long nextTimeout = 0;
- while(!_closed.get())
- {
-
- try
+ try
+ {
+ try (Selector selector = Selector.open())
{
+ _selector = selector;
+ while (!_closed.get())
+ {
- _selector.select(nextTimeout);
-
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
+ _selector.select(nextTimeout);
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- key.channel().register(_selector, 0);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
+ key.channel().register(_selector, 0);
- }
- selectionKeys.clear();
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
- while(_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ }
+ selectionKeys.clear();
+ while (_unregisteredConnections.peek() != null)
+ {
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- }
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while(iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
+ }
- int period = connection.getTicker().getTimeToNextTick(currentTime);
- if (period < 0 || connection.isStateChanged())
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
{
- toBeScheduled.add(connection);
- iterator.remove();
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+ if (period < 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ iterator.remove();
+ }
+ else
+ {
+ nextTimeout = Math.min(period, nextTimeout);
+ }
}
- else
+
+ for (NonBlockingConnection connection : toBeScheduled)
{
- nextTimeout = Math.min(period, nextTimeout);
+ _scheduler.schedule(connection);
}
- }
- for(NonBlockingConnection connection : toBeScheduled)
- {
- _scheduler.schedule(connection);
}
-
}
- catch (IOException e)
- {
- // Close ourselves? Inform accepting thread??
- e.printStackTrace();
- }
-
- }
- try
- {
- _selector.close();
}
catch (IOException e)
{
- // TODO
+ //TODO
e.printStackTrace();
}
+
}
public void addConnection(final NonBlockingConnection connection)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org