You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/22 15:07:46 UTC
svn commit: r1710010 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport:
NetworkConnectionScheduler.java NonBlockingConnection.java
SelectorThread.java
Author: rgodfrey
Date: Thu Oct 22 13:07:46 2015
New Revision: 1710010
URL: http://svn.apache.org/viewvc?rev=1710010&view=rev
Log:
QPID-6794 : Allow for multiple selector tasks to allow for better scaling with many connections
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Thu Oct 22 13:07:46 2015
@@ -190,11 +190,6 @@ public class NetworkConnectionScheduler
_selectorThread.addConnection(connection);
}
- public void wakeup()
- {
- _selectorThread.wakeup();
- }
-
public void removeConnection(final NonBlockingConnection connection)
{
_selectorThread.removeConnection(connection);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Oct 22 13:07:46 2015
@@ -68,6 +68,7 @@ public class NonBlockingConnection imple
private final AtomicBoolean _scheduled = new AtomicBoolean();
private volatile boolean _unexpectedByteBufferSizeReported;
private final String _threadName;
+ private volatile SelectorThread.SelectionTask _selectionTask;
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
@@ -144,7 +145,7 @@ public class NonBlockingConnection imple
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
- _scheduler.wakeup();
+ _selectionTask.wakeup();
}
}
@@ -501,4 +502,13 @@ public class NonBlockingConnection imple
}
}
+ public SelectorThread.SelectionTask getSelectionTask()
+ {
+ return _selectionTask;
+ }
+
+ public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
+ {
+ _selectionTask = selectionTask;
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Oct 22 13:07:46 2015
@@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,37 +48,291 @@ class SelectorThread extends Thread
static final String IO_THREAD_NAME_PREFIX = "IO-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
- /**
- * Queue of connections that are not currently scheduled and not registered with the selector.
- * These need to go back into the Selector.
- */
- private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
-
- /** Set of connections that are currently being selected upon */
- private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
-
- private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
- private final AtomicBoolean _selecting = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler;
- private long _nextTimeout;
private final BlockingQueue<Runnable> _workQueue = new LinkedBlockingQueue<>();
+ private final AtomicInteger _nextSelectorTaskIndex = new AtomicInteger();
- private Runnable _selectionTask = new Runnable()
+ public final class SelectionTask implements Runnable
{
+ private final Selector _selector;
+ private final AtomicBoolean _selecting = new AtomicBoolean();
+ private final AtomicBoolean _inSelect = new AtomicBoolean();
+ private final AtomicInteger _wakeups = new AtomicInteger();
+ private long _nextTimeout;
+
+ /**
+ * Queue of connections that are not currently scheduled and not registered with the selector.
+ * These need to go back into the Selector.
+ */
+ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+
+ /** Set of connections that are currently being selected upon */
+ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+
+
+
+ private SelectionTask() throws IOException
+ {
+ _selector = Selector.open();
+ }
+
@Override
public void run()
{
performSelect();
}
- };
+
+ public boolean acquireSelecting()
+ {
+ return _selecting.compareAndSet(false,true);
+ }
+
+ public void clearSelecting()
+ {
+ _selecting.set(false);
+ }
+
+ public Selector getSelector()
+ {
+ return _selector;
+ }
+
+ public Queue<NonBlockingConnection> getUnregisteredConnections()
+ {
+ return _unregisteredConnections;
+ }
+
+ public Set<NonBlockingConnection> getUnscheduledConnections()
+ {
+ return _unscheduledConnections;
+ }
+
+ private List<NonBlockingConnection> processUnscheduledConnections()
+ {
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = getUnscheduledConnections().iterator();
+ _nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
+
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
+
+ if (period <= 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ try
+ {
+ connection.getSocketChannel().register(_selector, 0);
+ }
+ catch (ClosedChannelException | CancelledKeyException e)
+ {
+ LOGGER.debug("Failed to register with selector for connection " + connection +
+ ". Connection is probably being closed by peer.", e);
+ }
+ iterator.remove();
+ }
+ else
+ {
+ _nextTimeout = Math.min(period, _nextTimeout);
+ }
+ }
+
+ return toBeScheduled;
+ }
+
+ private List<NonBlockingConnection> processSelectionKeys()
+ {
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ if(key.isAcceptable())
+ {
+ NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
+ // todo - should we schedule this rather than running in this thread?
+ transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+ }
+ else
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+ if(connection != null)
+ {
+ try
+ {
+ key.channel().register(_selector, 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ // Ignore - we will schedule the connection anyway
+ }
+
+ toBeScheduled.add(connection);
+ getUnscheduledConnections().remove(connection);
+ }
+ }
+
+ }
+ selectionKeys.clear();
+
+ return toBeScheduled;
+ }
+
+ private List<NonBlockingConnection> reregisterUnregisteredConnections()
+ {
+ List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
+
+ NonBlockingConnection unregisteredConnection;
+ while ((unregisteredConnection = getUnregisteredConnections().poll()) != null)
+ {
+ getUnscheduledConnections().add(unregisteredConnection);
+
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+ try
+ {
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ }
+ catch (ClosedChannelException e)
+ {
+ unregisterableConnections.add(unregisteredConnection);
+ }
+ }
+
+ return unregisterableConnections;
+ }
+
+ private void performSelect()
+ {
+ while(!_closed.get())
+ {
+ if(acquireSelecting())
+ {
+ List<ConnectionProcessor> connections = new ArrayList<>();
+ try
+ {
+ if (!_closed.get())
+ {
+ Thread.currentThread().setName("Selector-" + _scheduler.getName());
+ try
+ {
+ if(_wakeups.getAndSet(0) > 0)
+ {
+ _selector.selectNow();
+ }
+ else
+ {
+ _inSelect.set(true);
+
+ _selector.select(_nextTimeout);
+
+ _inSelect.set(false);
+ }
+
+ }
+ catch (IOException e)
+ {
+ // TODO Inform the model object
+ LOGGER.error("Failed to trying to select()", e);
+ closeSelector();
+ return;
+ }
+ runTasks();
+ for (NonBlockingConnection connection : processSelectionKeys())
+ {
+ if(connection.setScheduled())
+ {
+ connections.add(new ConnectionProcessor(_scheduler, connection));
+ }
+ }
+ for (NonBlockingConnection connection : reregisterUnregisteredConnections())
+ {
+ if(connection.setScheduled())
+ {
+ connections.add(new ConnectionProcessor(_scheduler, connection));
+ }
+ }
+ for (NonBlockingConnection connection : processUnscheduledConnections())
+ {
+ if(connection.setScheduled())
+ {
+ connections.add(new ConnectionProcessor(_scheduler, connection));
+ }
+ }
+
+ }
+ }
+ finally
+ {
+ clearSelecting();
+ }
+ _workQueue.add(this);
+ _workQueue.addAll(connections);
+ for(ConnectionProcessor connectionProcessor : connections)
+ {
+ connectionProcessor.run();
+ }
+
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if(_closed.get() && acquireSelecting())
+ {
+ closeSelector();
+ }
+ }
+
+ private void closeSelector()
+ {
+ try
+ {
+ if(_selector.isOpen())
+ {
+ _selector.close();
+ }
+ }
+ catch (IOException e)
+
+ {
+ LOGGER.debug("Failed to close selector", e);
+ }
+ }
+
+ public void wakeup()
+ {
+ _wakeups.compareAndSet(0, 1);
+ if(_inSelect.get() && _wakeups.get() != 0)
+ {
+ _selector.wakeup();
+ }
+ }
+ }
+
+ private SelectionTask[] _selectionTasks;
SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
{
- _selector = Selector.open();
_scheduler = scheduler;
- _workQueue.add(_selectionTask);
+ int selectors = Math.max(scheduler.getPoolSizeMaximum()/8,1);
+ _selectionTasks = new SelectionTask[selectors];
+ for(int i = 0; i < selectors; i++)
+ {
+ _selectionTasks[i] = new SelectionTask();
+ }
+ for(SelectionTask task : _selectionTasks)
+ {
+ _workQueue.add(task);
+ }
}
public void addAcceptingSocket(final ServerSocketChannel socketChannel,
@@ -91,7 +346,7 @@ class SelectorThread extends Thread
try
{
- socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
+ socketChannel.register(_selectionTasks[0].getSelector(), SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
}
catch (IllegalStateException | ClosedChannelException e)
{
@@ -99,7 +354,7 @@ class SelectorThread extends Thread
LOGGER.error("Failed to register selector on accepting port", e);
} }
});
- _selector.wakeup();
+ _selectionTasks[0].wakeup();
}
public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
@@ -109,50 +364,14 @@ class SelectorThread extends Thread
@Override
public void run()
{
- SelectionKey selectionKey = socketChannel.keyFor(_selector);
+ SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
if (selectionKey != null)
{
selectionKey.cancel();
}
}
});
- _selector.wakeup();
- }
-
- private List<NonBlockingConnection> processUnscheduledConnections()
- {
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- _nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
-
- int period = connection.getTicker().getTimeToNextTick(currentTime);
-
- if (period <= 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- try
- {
- unregisterConnection(connection);
- }
- catch (ClosedChannelException e)
- {
- LOGGER.debug("Failed to register with selector for connection " + connection +
- ". Connection is probably being closed by peer.", e);
- }
- iterator.remove();
- }
- else
- {
- _nextTimeout = Math.min(period, _nextTimeout);
- }
- }
-
- return toBeScheduled;
+ _selectionTasks[0].wakeup();
}
@Override
@@ -177,22 +396,6 @@ class SelectorThread extends Thread
}
- private void closeSelector()
- {
- try
- {
- if(_selector.isOpen())
- {
- _selector.close();
- }
- }
- catch (IOException e)
-
- {
- LOGGER.debug("Failed to close selector", e);
- }
- }
-
private static final class ConnectionProcessor implements Runnable
{
@@ -216,145 +419,12 @@ class SelectorThread extends Thread
}
}
- private void performSelect()
- {
- while(!_closed.get())
- {
- if(_selecting.compareAndSet(false,true))
- {
- List<ConnectionProcessor> connections = new ArrayList<>();
- try
- {
- if (!_closed.get())
- {
- Thread.currentThread().setName("Selector-" + _scheduler.getName());
- try
- {
- _selector.select(_nextTimeout);
- }
- catch (IOException e)
- {
- // TODO Inform the model object
- LOGGER.error("Failed to trying to select()", e);
- closeSelector();
- return;
- }
- runTasks();
- for (NonBlockingConnection connection : processSelectionKeys())
- {
- if(connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection : reregisterUnregisteredConnections())
- {
- if(connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection : processUnscheduledConnections())
- {
- if(connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
-
- }
- }
- finally
- {
- _selecting.set(false);
- }
- _workQueue.add(_selectionTask);
- _workQueue.addAll(connections);
- for(ConnectionProcessor connectionProcessor : connections)
- {
- connectionProcessor.run();
- }
-
- }
- else
- {
- break;
- }
- }
-
- if(_closed.get() && _selecting.compareAndSet(false,true))
- {
- closeSelector();
- }
- }
-
private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
{
- SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+ SelectionKey register = connection.getSocketChannel().register(connection.getSelectionTask().getSelector(), 0);
register.cancel();
}
- private List<NonBlockingConnection> reregisterUnregisteredConnections()
- {
- List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
-
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
-
-
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- try
- {
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
- }
- catch (ClosedChannelException e)
- {
- unregisterableConnections.add(unregisteredConnection);
- }
- }
-
- return unregisterableConnections;
- }
-
- private List<NonBlockingConnection> processSelectionKeys()
- {
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
- {
- if(key.isAcceptable())
- {
- NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
- // todo - should we schedule this rather than running in this thread?
- transport.acceptSocketChannel((ServerSocketChannel)key.channel());
- }
- else
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
- try
- {
- key.channel().register(_selector, 0);
- }
- catch (ClosedChannelException e)
- {
- // Ignore - we will schedule the connection anyway
- }
-
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
- }
-
- }
- selectionKeys.clear();
-
- return toBeScheduled;
- }
-
private void runTasks()
{
while(_tasks.peek() != null)
@@ -366,17 +436,25 @@ class SelectorThread extends Thread
private boolean selectionInterestRequiresUpdate(NonBlockingConnection connection)
{
- final SelectionKey selectionKey = connection.getSocketChannel().keyFor(_selector);
- int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
- if(connection.waitingForWrite())
+ SelectionTask selectionTask = connection.getSelectionTask();
+ if(selectionTask != null)
{
- expectedOps |= SelectionKey.OP_WRITE;
- }
- try
- {
- return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;
+ final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
+ int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
+ if (connection.waitingForWrite())
+ {
+ expectedOps |= SelectionKey.OP_WRITE;
+ }
+ try
+ {
+ return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;
+ }
+ catch (CancelledKeyException e)
+ {
+ return true;
+ }
}
- catch (CancelledKeyException e)
+ else
{
return true;
}
@@ -386,12 +464,25 @@ class SelectorThread extends Thread
{
if(selectionInterestRequiresUpdate(connection))
{
- _unregisteredConnections.add(connection);
- _selector.wakeup();
+ SelectionTask selectionTask = getNextSelectionTask();
+ connection.setSelectionTask(selectionTask);
+ selectionTask.getUnregisteredConnections().add(connection);
+ selectionTask.wakeup();
}
}
+ private SelectionTask getNextSelectionTask()
+ {
+ int index;
+ do
+ {
+ index = _nextSelectorTaskIndex.get();
+ }
+ while(!_nextSelectorTaskIndex.compareAndSet(index, (index + 1) % _selectionTasks.length));
+ return _selectionTasks[index];
+ }
+
void removeConnection(NonBlockingConnection connection)
{
try
@@ -406,11 +497,6 @@ class SelectorThread extends Thread
}
}
- public void wakeup()
- {
- _selector.wakeup();
- }
-
public void close()
{
Runnable goodNight = new Runnable()
@@ -429,7 +515,10 @@ class SelectorThread extends Thread
_workQueue.offer(goodNight);
}
- _selector.wakeup();
+ for(SelectionTask task : _selectionTasks)
+ {
+ task.wakeup();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org