You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2021/06/13 20:19:51 UTC
[qpid-broker-j] branch main updated: QPID-8530: [Broker-J]
Duplicated functionality of the Selector::wakeup method in SelectorThread
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new baaad4f QPID-8530: [Broker-J] Duplicated functionality of the Selector::wakeup method in SelectorThread
baaad4f is described below
commit baaad4f2370688a89a4602b8380f5825da84f33d
Author: Marek Laca <mk...@gmail.com>
AuthorDate: Thu Jun 3 22:52:54 2021 +0200
QPID-8530: [Broker-J] Duplicated functionality of the Selector::wakeup method in SelectorThread
This closes #90
---
.../qpid/server/transport/SelectorThread.java | 108 ++++++++-------------
1 file changed, 39 insertions(+), 69 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index 6c0ce8d..dfd7659 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -71,8 +71,6 @@ class SelectorThread extends Thread
{
private final Selector _selector;
private final AtomicBoolean _selecting = new AtomicBoolean();
- private final AtomicBoolean _inSelect = new AtomicBoolean();
- private final AtomicInteger _wakeups = new AtomicInteger();
private long _nextTimeout;
/**
@@ -102,7 +100,7 @@ class SelectorThread extends Thread
return _selecting.compareAndSet(false,true);
}
- public void clearSelecting()
+ private void clearSelecting()
{
_selecting.set(false);
}
@@ -295,81 +293,57 @@ class SelectorThread extends Thread
_scheduler.incrementRunningCount();
try
{
- while (!_closed.get())
+ while (!_closed.get() && acquireSelecting())
{
- if (acquireSelecting())
+ final List<ConnectionProcessor> connections = new ArrayList<>();
+ try
{
- List<ConnectionProcessor> connections = new ArrayList<>();
- try
+ Thread.currentThread().setName(_scheduler.getSelectorThreadName());
+ _selector.select(_nextTimeout);
+
+ for (final NonBlockingConnection connection : processSelectionKeys())
{
- if (!_closed.get())
+ if (connection.setScheduled())
{
- Thread.currentThread().setName(_scheduler.getSelectorThreadName());
- _inSelect.set(true);
- try
- {
- if (_wakeups.getAndSet(0) > 0)
- {
- _selector.selectNow();
- }
- else
- {
- _selector.select(_nextTimeout);
- }
- }
- catch (IOException e)
- {
- // TODO Inform the model object
- LOGGER.error("Failed to trying to select()", e);
- closeSelector();
- return;
- }
- finally
- {
- _inSelect.set(false);
- }
- for (NonBlockingConnection connection : processSelectionKeys())
- {
- if (connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection : reregisterUnregisteredConnections())
- {
- if (connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection : processUnscheduledConnections())
- {
- if (connection.setScheduled())
- {
- connections.add(new ConnectionProcessor(_scheduler, connection));
- }
- }
- runTasks();
+ connections.add(new ConnectionProcessor(_scheduler, connection));
}
}
- finally
+ for (final NonBlockingConnection connection : reregisterUnregisteredConnections())
{
- clearSelecting();
+ if (connection.setScheduled())
+ {
+ connections.add(new ConnectionProcessor(_scheduler, connection));
+ }
}
-
- if (!connections.isEmpty())
+ for (final NonBlockingConnection connection : processUnscheduledConnections())
{
- _workQueue.addAll(connections);
- _workQueue.add(this);
- for (ConnectionProcessor connectionProcessor : connections)
+ if (connection.setScheduled())
{
- connectionProcessor.processConnection();
+ connections.add(new ConnectionProcessor(_scheduler, connection));
}
}
+ runTasks();
+ }
+ catch (IOException e)
+ {
+ // TODO Inform the model object
+ LOGGER.error("Failed to trying to select()", e);
+ closeSelector();
+ return;
}
- else
+ finally
{
- break;
+ clearSelecting();
+ }
+
+ if (!connections.isEmpty())
+ {
+ _workQueue.addAll(connections);
+ _workQueue.add(this);
+ for (final ConnectionProcessor connectionProcessor : connections)
+ {
+ connectionProcessor.processConnection();
+ }
}
}
@@ -402,11 +376,7 @@ class SelectorThread extends Thread
public void wakeup()
{
- _wakeups.compareAndSet(0, 1);
- if(_inSelect.get() && _wakeups.get() != 0)
- {
- _selector.wakeup();
- }
+ _selector.wakeup();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org