You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/06/13 20:19:51 UTC

[qpid-broker-j] branch main updated: QPID-8530: [Broker-J] Duplicated functionality of the Selector::wakeup method in SelectorThread

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new baaad4f  QPID-8530: [Broker-J] Duplicated functionality of the Selector::wakeup method in SelectorThread
baaad4f is described below

commit baaad4f2370688a89a4602b8380f5825da84f33d
Author: Marek Laca <mk...@gmail.com>
AuthorDate: Thu Jun 3 22:52:54 2021 +0200

    QPID-8530: [Broker-J] Duplicated functionality of the Selector::wakeup method in SelectorThread
    
    This closes #90
---
 .../qpid/server/transport/SelectorThread.java      | 108 ++++++++-------------
 1 file changed, 39 insertions(+), 69 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index 6c0ce8d..dfd7659 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -71,8 +71,6 @@ class SelectorThread extends Thread
     {
         private final Selector _selector;
         private final AtomicBoolean _selecting = new AtomicBoolean();
-        private final AtomicBoolean _inSelect = new AtomicBoolean();
-        private final AtomicInteger _wakeups = new AtomicInteger();
         private long _nextTimeout;
 
         /**
@@ -102,7 +100,7 @@ class SelectorThread extends Thread
             return _selecting.compareAndSet(false,true);
         }
 
-        public void clearSelecting()
+        private void clearSelecting()
         {
             _selecting.set(false);
         }
@@ -295,81 +293,57 @@ class SelectorThread extends Thread
             _scheduler.incrementRunningCount();
             try
             {
-                while (!_closed.get())
+                while (!_closed.get() && acquireSelecting())
                 {
-                    if (acquireSelecting())
+                    final List<ConnectionProcessor> connections = new ArrayList<>();
+                    try
                     {
-                        List<ConnectionProcessor> connections = new ArrayList<>();
-                        try
+                        Thread.currentThread().setName(_scheduler.getSelectorThreadName());
+                        _selector.select(_nextTimeout);
+
+                        for (final NonBlockingConnection connection : processSelectionKeys())
                         {
-                            if (!_closed.get())
+                            if (connection.setScheduled())
                             {
-                                Thread.currentThread().setName(_scheduler.getSelectorThreadName());
-                                _inSelect.set(true);
-                                try
-                                {
-                                    if (_wakeups.getAndSet(0) > 0)
-                                    {
-                                        _selector.selectNow();
-                                    }
-                                    else
-                                    {
-                                        _selector.select(_nextTimeout);
-                                    }
-                                }
-                                catch (IOException e)
-                                {
-                                    // TODO Inform the model object
-                                    LOGGER.error("Failed to trying to select()", e);
-                                    closeSelector();
-                                    return;
-                                }
-                                finally
-                                {
-                                    _inSelect.set(false);
-                                }
-                                for (NonBlockingConnection connection : processSelectionKeys())
-                                {
-                                    if (connection.setScheduled())
-                                    {
-                                        connections.add(new ConnectionProcessor(_scheduler, connection));
-                                    }
-                                }
-                                for (NonBlockingConnection connection : reregisterUnregisteredConnections())
-                                {
-                                    if (connection.setScheduled())
-                                    {
-                                        connections.add(new ConnectionProcessor(_scheduler, connection));
-                                    }
-                                }
-                                for (NonBlockingConnection connection : processUnscheduledConnections())
-                                {
-                                    if (connection.setScheduled())
-                                    {
-                                        connections.add(new ConnectionProcessor(_scheduler, connection));
-                                    }
-                                }
-                                runTasks();
+                                connections.add(new ConnectionProcessor(_scheduler, connection));
                             }
                         }
-                        finally
+                        for (final NonBlockingConnection connection : reregisterUnregisteredConnections())
                         {
-                            clearSelecting();
+                            if (connection.setScheduled())
+                            {
+                                connections.add(new ConnectionProcessor(_scheduler, connection));
+                            }
                         }
-
-                        if (!connections.isEmpty())
+                        for (final NonBlockingConnection connection : processUnscheduledConnections())
                         {
-                            _workQueue.addAll(connections);
-                            _workQueue.add(this);
-                            for (ConnectionProcessor connectionProcessor : connections)
+                            if (connection.setScheduled())
                             {
-                                connectionProcessor.processConnection();
+                                connections.add(new ConnectionProcessor(_scheduler, connection));
                             }
                         }
+                        runTasks();
+                    }
+                    catch (IOException e)
+                    {
+                        // TODO Inform the model object
+                        LOGGER.error("Failed to trying to select()", e);
+                        closeSelector();
+                        return;
                     }
-                    else
+                    finally
                     {
-                        break;
+                        clearSelecting();
+                    }
+
+                    if (!connections.isEmpty())
+                    {
+                        _workQueue.addAll(connections);
+                        _workQueue.add(this);
+                        for (final ConnectionProcessor connectionProcessor : connections)
+                        {
+                            connectionProcessor.processConnection();
+                        }
                     }
                 }
 
@@ -402,11 +376,7 @@ class SelectorThread extends Thread
 
         public void wakeup()
         {
-            _wakeups.compareAndSet(0, 1);
-            if(_inSelect.get() && _wakeups.get() != 0)
-            {
-                _selector.wakeup();
-            }
+            _selector.wakeup();
         }
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org