You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/10/22 15:07:46 UTC

svn commit: r1710010 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport: NetworkConnectionScheduler.java NonBlockingConnection.java SelectorThread.java

Author: rgodfrey
Date: Thu Oct 22 13:07:46 2015
New Revision: 1710010

URL: http://svn.apache.org/viewvc?rev=1710010&view=rev
Log:
QPID-6794 : Allow for multiple selector tasks to allow for better scaling with many connections

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Thu Oct 22 13:07:46 2015
@@ -190,11 +190,6 @@ public class NetworkConnectionScheduler
         _selectorThread.addConnection(connection);
     }
 
-    public void wakeup()
-    {
-        _selectorThread.wakeup();
-    }
-
     public void removeConnection(final NonBlockingConnection connection)
     {
         _selectorThread.removeConnection(connection);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Oct 22 13:07:46 2015
@@ -68,6 +68,7 @@ public class NonBlockingConnection imple
     private final AtomicBoolean _scheduled = new AtomicBoolean();
     private volatile boolean _unexpectedByteBufferSizeReported;
     private final String _threadName;
+    private volatile SelectorThread.SelectionTask _selectionTask;
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ProtocolEngine protocolEngine,
@@ -144,7 +145,7 @@ public class NonBlockingConnection imple
         if(_closed.compareAndSet(false,true))
         {
             _protocolEngine.notifyWork();
-            _scheduler.wakeup();
+            _selectionTask.wakeup();
         }
     }
 
@@ -501,4 +502,13 @@ public class NonBlockingConnection imple
         }
     }
 
+    public SelectorThread.SelectionTask getSelectionTask()
+    {
+        return _selectionTask;
+    }
+
+    public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
+    {
+        _selectionTask = selectionTask;
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1710010&r1=1710009&r2=1710010&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Oct 22 13:07:46 2015
@@ -35,6 +35,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,37 +48,291 @@ class SelectorThread extends Thread
     static final String IO_THREAD_NAME_PREFIX  = "IO-";
     private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
 
-    /**
-     * Queue of connections that are not currently scheduled and not registered with the selector.
-     * These need to go back into the Selector.
-     */
-    private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
-
-    /** Set of connections that are currently being selected upon */
-    private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
-
-    private final Selector _selector;
     private final AtomicBoolean _closed = new AtomicBoolean();
-    private final AtomicBoolean _selecting = new AtomicBoolean();
     private final NetworkConnectionScheduler _scheduler;
-    private long _nextTimeout;
 
     private final BlockingQueue<Runnable> _workQueue = new LinkedBlockingQueue<>();
+    private final  AtomicInteger _nextSelectorTaskIndex = new AtomicInteger();
 
-    private Runnable _selectionTask = new Runnable()
+    public final class SelectionTask implements Runnable
     {
+        private final Selector _selector;
+        private final AtomicBoolean _selecting = new AtomicBoolean();
+        private final AtomicBoolean _inSelect = new AtomicBoolean();
+        private final AtomicInteger _wakeups = new AtomicInteger();
+        private long _nextTimeout;
+
+        /**
+         * Queue of connections that are not currently scheduled and not registered with the selector.
+         * These need to go back into the Selector.
+         */
+        private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+
+        /** Set of connections that are currently being selected upon */
+        private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+
+
+
+        private SelectionTask() throws IOException
+        {
+            _selector = Selector.open();
+        }
+
         @Override
         public void run()
         {
             performSelect();
         }
-    };
+
+        public boolean acquireSelecting()
+        {
+            return _selecting.compareAndSet(false,true);
+        }
+
+        public void clearSelecting()
+        {
+            _selecting.set(false);
+        }
+
+        public Selector getSelector()
+        {
+            return _selector;
+        }
+
+        public Queue<NonBlockingConnection> getUnregisteredConnections()
+        {
+            return _unregisteredConnections;
+        }
+
+        public Set<NonBlockingConnection> getUnscheduledConnections()
+        {
+            return _unscheduledConnections;
+        }
+
+        private List<NonBlockingConnection> processUnscheduledConnections()
+        {
+            List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+            long currentTime = System.currentTimeMillis();
+            Iterator<NonBlockingConnection> iterator = getUnscheduledConnections().iterator();
+            _nextTimeout = Integer.MAX_VALUE;
+            while (iterator.hasNext())
+            {
+                NonBlockingConnection connection = iterator.next();
+
+                int period = connection.getTicker().getTimeToNextTick(currentTime);
+
+                if (period <= 0 || connection.isStateChanged())
+                {
+                    toBeScheduled.add(connection);
+                    try
+                    {
+                        connection.getSocketChannel().register(_selector, 0);
+                    }
+                    catch (ClosedChannelException | CancelledKeyException e)
+                    {
+                        LOGGER.debug("Failed to register with selector for connection " + connection +
+                                     ". Connection is probably being closed by peer.", e);
+                    }
+                    iterator.remove();
+                }
+                else
+                {
+                    _nextTimeout = Math.min(period, _nextTimeout);
+                }
+            }
+
+            return toBeScheduled;
+        }
+
+        private List<NonBlockingConnection> processSelectionKeys()
+        {
+            List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
+
+            Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+            for (SelectionKey key : selectionKeys)
+            {
+                if(key.isAcceptable())
+                {
+                    NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
+                    // todo - should we schedule this rather than running in this thread?
+                    transport.acceptSocketChannel((ServerSocketChannel)key.channel());
+                }
+                else
+                {
+                    NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+                    if(connection != null)
+                    {
+                        try
+                        {
+                            key.channel().register(_selector, 0);
+                        }
+                        catch (ClosedChannelException e)
+                        {
+                            // Ignore - we will schedule the connection anyway
+                        }
+
+                        toBeScheduled.add(connection);
+                        getUnscheduledConnections().remove(connection);
+                    }
+                }
+
+            }
+            selectionKeys.clear();
+
+            return toBeScheduled;
+        }
+
+        private List<NonBlockingConnection> reregisterUnregisteredConnections()
+        {
+            List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
+
+            NonBlockingConnection unregisteredConnection;
+            while ((unregisteredConnection = getUnregisteredConnections().poll()) != null)
+            {
+                getUnscheduledConnections().add(unregisteredConnection);
+
+
+                final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+                                | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+                try
+                {
+                    unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+                }
+                catch (ClosedChannelException e)
+                {
+                    unregisterableConnections.add(unregisteredConnection);
+                }
+            }
+
+            return unregisterableConnections;
+        }
+
+        private void performSelect()
+        {
+            while(!_closed.get())
+            {
+                if(acquireSelecting())
+                {
+                    List<ConnectionProcessor> connections = new ArrayList<>();
+                    try
+                    {
+                        if (!_closed.get())
+                        {
+                            Thread.currentThread().setName("Selector-" + _scheduler.getName());
+                            try
+                            {
+                                if(_wakeups.getAndSet(0) > 0)
+                                {
+                                    _selector.selectNow();
+                                }
+                                else
+                                {
+                                    _inSelect.set(true);
+
+                                    _selector.select(_nextTimeout);
+
+                                    _inSelect.set(false);
+                                }
+
+                            }
+                            catch (IOException e)
+                            {
+                                // TODO Inform the model object
+                                LOGGER.error("Failed to trying to select()", e);
+                                closeSelector();
+                                return;
+                            }
+                            runTasks();
+                            for (NonBlockingConnection connection : processSelectionKeys())
+                            {
+                                if(connection.setScheduled())
+                                {
+                                    connections.add(new ConnectionProcessor(_scheduler, connection));
+                                }
+                            }
+                            for (NonBlockingConnection connection : reregisterUnregisteredConnections())
+                            {
+                                if(connection.setScheduled())
+                                {
+                                    connections.add(new ConnectionProcessor(_scheduler, connection));
+                                }
+                            }
+                            for (NonBlockingConnection connection : processUnscheduledConnections())
+                            {
+                                if(connection.setScheduled())
+                                {
+                                    connections.add(new ConnectionProcessor(_scheduler, connection));
+                                }
+                            }
+
+                        }
+                    }
+                    finally
+                    {
+                        clearSelecting();
+                    }
+                    _workQueue.add(this);
+                    _workQueue.addAll(connections);
+                    for(ConnectionProcessor connectionProcessor : connections)
+                    {
+                        connectionProcessor.run();
+                    }
+
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            if(_closed.get() && acquireSelecting())
+            {
+                closeSelector();
+            }
+        }
+
+        private void closeSelector()
+        {
+            try
+            {
+                if(_selector.isOpen())
+                {
+                    _selector.close();
+                }
+            }
+            catch (IOException e)
+
+            {
+                LOGGER.debug("Failed to close selector", e);
+            }
+        }
+
+        public void wakeup()
+        {
+            _wakeups.compareAndSet(0, 1);
+            if(_inSelect.get() && _wakeups.get() != 0)
+            {
+                _selector.wakeup();
+            }
+        }
+    }
+
+    private SelectionTask[] _selectionTasks;
 
     SelectorThread(final NetworkConnectionScheduler scheduler) throws IOException
     {
-        _selector = Selector.open();
         _scheduler = scheduler;
-        _workQueue.add(_selectionTask);
+        int selectors = Math.max(scheduler.getPoolSizeMaximum()/8,1);
+        _selectionTasks = new SelectionTask[selectors];
+        for(int i = 0; i < selectors; i++)
+        {
+            _selectionTasks[i] = new SelectionTask();
+        }
+        for(SelectionTask task : _selectionTasks)
+        {
+            _workQueue.add(task);
+        }
     }
 
     public void addAcceptingSocket(final ServerSocketChannel socketChannel,
@@ -91,7 +346,7 @@ class SelectorThread extends Thread
 
                 try
                 {
-                    socketChannel.register(_selector, SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
+                    socketChannel.register(_selectionTasks[0].getSelector(), SelectionKey.OP_ACCEPT, nonBlockingNetworkTransport);
                 }
                 catch (IllegalStateException | ClosedChannelException e)
                 {
@@ -99,7 +354,7 @@ class SelectorThread extends Thread
                     LOGGER.error("Failed to register selector on accepting port", e);
                 }             }
         });
-        _selector.wakeup();
+        _selectionTasks[0].wakeup();
     }
 
     public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
@@ -109,50 +364,14 @@ class SelectorThread extends Thread
             @Override
             public void run()
             {
-                SelectionKey selectionKey = socketChannel.keyFor(_selector);
+                SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
                 if (selectionKey != null)
                 {
                     selectionKey.cancel();
                 }
             }
         });
-        _selector.wakeup();
-    }
-
-    private List<NonBlockingConnection> processUnscheduledConnections()
-    {
-        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-        long currentTime = System.currentTimeMillis();
-        Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
-        _nextTimeout = Integer.MAX_VALUE;
-        while (iterator.hasNext())
-        {
-            NonBlockingConnection connection = iterator.next();
-
-            int period = connection.getTicker().getTimeToNextTick(currentTime);
-
-            if (period <= 0 || connection.isStateChanged())
-            {
-                toBeScheduled.add(connection);
-                try
-                {
-                    unregisterConnection(connection);
-                }
-                catch (ClosedChannelException e)
-                {
-                    LOGGER.debug("Failed to register with selector for connection " + connection +
-                                 ". Connection is probably being closed by peer.", e);
-                }
-                iterator.remove();
-            }
-            else
-            {
-                _nextTimeout = Math.min(period, _nextTimeout);
-            }
-        }
-
-        return toBeScheduled;
+        _selectionTasks[0].wakeup();
     }
 
     @Override
@@ -177,22 +396,6 @@ class SelectorThread extends Thread
 
     }
 
-    private void closeSelector()
-    {
-        try
-        {
-            if(_selector.isOpen())
-            {
-                _selector.close();
-            }
-        }
-        catch (IOException e)
-
-        {
-            LOGGER.debug("Failed to close selector", e);
-        }
-    }
-
     private static final class ConnectionProcessor implements Runnable
     {
 
@@ -216,145 +419,12 @@ class SelectorThread extends Thread
         }
     }
 
-    private void performSelect()
-    {
-        while(!_closed.get())
-        {
-            if(_selecting.compareAndSet(false,true))
-            {
-                List<ConnectionProcessor> connections = new ArrayList<>();
-                try
-                {
-                    if (!_closed.get())
-                    {
-                        Thread.currentThread().setName("Selector-" + _scheduler.getName());
-                        try
-                        {
-                             _selector.select(_nextTimeout);
-                        }
-                        catch (IOException e)
-                        {
-                            // TODO Inform the model object
-                            LOGGER.error("Failed to trying to select()", e);
-                            closeSelector();
-                            return;
-                        }
-                        runTasks();
-                        for (NonBlockingConnection connection : processSelectionKeys())
-                        {
-                            if(connection.setScheduled())
-                            {
-                                connections.add(new ConnectionProcessor(_scheduler, connection));
-                            }
-                        }
-                        for (NonBlockingConnection connection : reregisterUnregisteredConnections())
-                        {
-                            if(connection.setScheduled())
-                            {
-                                connections.add(new ConnectionProcessor(_scheduler, connection));
-                            }
-                        }
-                        for (NonBlockingConnection connection : processUnscheduledConnections())
-                        {
-                            if(connection.setScheduled())
-                            {
-                                connections.add(new ConnectionProcessor(_scheduler, connection));
-                            }
-                        }
-
-                    }
-                }
-                finally
-                {
-                    _selecting.set(false);
-                }
-                _workQueue.add(_selectionTask);
-                _workQueue.addAll(connections);
-                for(ConnectionProcessor connectionProcessor : connections)
-                {
-                    connectionProcessor.run();
-                }
-
-            }
-            else
-            {
-                break;
-            }
-        }
-
-        if(_closed.get() && _selecting.compareAndSet(false,true))
-        {
-            closeSelector();
-        }
-    }
-
     private void unregisterConnection(final NonBlockingConnection connection) throws ClosedChannelException
     {
-        SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+        SelectionKey register = connection.getSocketChannel().register(connection.getSelectionTask().getSelector(), 0);
         register.cancel();
     }
 
-    private List<NonBlockingConnection> reregisterUnregisteredConnections()
-    {
-        List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
-
-        while (_unregisteredConnections.peek() != null)
-        {
-            NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
-            _unscheduledConnections.add(unregisteredConnection);
-
-
-            final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
-                            | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
-            try
-            {
-                unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
-            }
-            catch (ClosedChannelException e)
-            {
-                unregisterableConnections.add(unregisteredConnection);
-            }
-        }
-
-        return unregisterableConnections;
-    }
-
-    private List<NonBlockingConnection> processSelectionKeys()
-    {
-        List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-        Set<SelectionKey> selectionKeys = _selector.selectedKeys();
-        for (SelectionKey key : selectionKeys)
-        {
-            if(key.isAcceptable())
-            {
-                NonBlockingNetworkTransport transport = (NonBlockingNetworkTransport) key.attachment();
-                // todo - should we schedule this rather than running in this thread?
-                transport.acceptSocketChannel((ServerSocketChannel)key.channel());
-            }
-            else
-            {
-                NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
-                try
-                {
-                    key.channel().register(_selector, 0);
-                }
-                catch (ClosedChannelException e)
-                {
-                    // Ignore - we will schedule the connection anyway
-                }
-
-                toBeScheduled.add(connection);
-                _unscheduledConnections.remove(connection);
-            }
-
-        }
-        selectionKeys.clear();
-
-        return toBeScheduled;
-    }
-
     private void runTasks()
     {
         while(_tasks.peek() != null)
@@ -366,17 +436,25 @@ class SelectorThread extends Thread
 
     private boolean selectionInterestRequiresUpdate(NonBlockingConnection connection)
     {
-        final SelectionKey selectionKey = connection.getSocketChannel().keyFor(_selector);
-        int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
-        if(connection.waitingForWrite())
+        SelectionTask selectionTask = connection.getSelectionTask();
+        if(selectionTask != null)
         {
-            expectedOps |= SelectionKey.OP_WRITE;
-        }
-        try
-        {
-            return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;
+            final SelectionKey selectionKey = connection.getSocketChannel().keyFor(selectionTask.getSelector());
+            int expectedOps = connection.canRead() ? SelectionKey.OP_READ : 0;
+            if (connection.waitingForWrite())
+            {
+                expectedOps |= SelectionKey.OP_WRITE;
+            }
+            try
+            {
+                return selectionKey == null || !selectionKey.isValid() || selectionKey.interestOps() != expectedOps;
+            }
+            catch (CancelledKeyException e)
+            {
+                return true;
+            }
         }
-        catch (CancelledKeyException e)
+        else
         {
             return true;
         }
@@ -386,12 +464,25 @@ class SelectorThread extends Thread
     {
         if(selectionInterestRequiresUpdate(connection))
         {
-            _unregisteredConnections.add(connection);
-            _selector.wakeup();
+            SelectionTask selectionTask = getNextSelectionTask();
+            connection.setSelectionTask(selectionTask);
+            selectionTask.getUnregisteredConnections().add(connection);
+            selectionTask.wakeup();
         }
 
     }
 
+    private SelectionTask getNextSelectionTask()
+    {
+        int index;
+        do
+        {
+            index = _nextSelectorTaskIndex.get();
+        }
+        while(!_nextSelectorTaskIndex.compareAndSet(index, (index + 1) % _selectionTasks.length));
+        return _selectionTasks[index];
+    }
+
     void removeConnection(NonBlockingConnection connection)
     {
         try
@@ -406,11 +497,6 @@ class SelectorThread extends Thread
         }
     }
 
-    public void wakeup()
-    {
-        _selector.wakeup();
-    }
-
     public void close()
     {
         Runnable goodNight = new Runnable()
@@ -429,7 +515,10 @@ class SelectorThread extends Thread
             _workQueue.offer(goodNight);
         }
 
-        _selector.wakeup();
+        for(SelectionTask task : _selectionTasks)
+        {
+            task.wakeup();
+        }
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org