You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/11/14 11:23:37 UTC
svn commit: r1769597 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport:
AggregateTicker.java NonBlockingConnection.java SelectorThread.java
Author: kwall
Date: Mon Nov 14 11:23:36 2016
New Revision: 1769597
URL: http://svn.apache.org/viewvc?rev=1769597&view=rev
Log:
QPID-7508: [Java Broker] Ensure that returning connection to the selectors pays attention to a change in ticker state
Also prevents unnecessary spinning of the selector
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java Mon Nov 14 11:23:36 2016
@@ -20,6 +20,7 @@
package org.apache.qpid.server.transport;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.transport.network.Ticker;
@@ -27,6 +28,7 @@ public class AggregateTicker implements
{
private final CopyOnWriteArrayList<Ticker> _tickers = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _modified = new AtomicBoolean();
@Override
public int getTimeToNextTick(final long currentTime)
@@ -58,19 +60,26 @@ public class AggregateTicker implements
return nextTick;
}
- public CopyOnWriteArrayList<Ticker> getTickers()
- {
- return _tickers;
- }
-
public void addTicker(Ticker ticker)
{
_tickers.add(ticker);
+ _modified.set(true);
}
public void removeTicker(Ticker ticker)
{
_tickers.remove(ticker);
+ _modified.set(true);
+ }
+
+ public boolean getModified()
+ {
+ return _modified.get();
+ }
+
+ public void resetModified()
+ {
+ _modified.set(false);
}
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Mon Nov 14 11:23:36 2016
@@ -133,7 +133,7 @@ public class NonBlockingConnection imple
return _partialRead;
}
- Ticker getTicker()
+ AggregateTicker getTicker()
{
return _protocolEngine.getAggregateTicker();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Mon Nov 14 11:23:36 2016
@@ -138,7 +138,9 @@ class SelectorThread extends Thread
{
NonBlockingConnection connection = iterator.next();
- int period = connection.getTicker().getTimeToNextTick(currentTime);
+ final AggregateTicker ticker = connection.getTicker();
+ int period = ticker.getTimeToNextTick(currentTime);
+ ticker.resetModified();
if (period <= 0 || connection.isStateChanged())
{
@@ -616,13 +618,14 @@ class SelectorThread extends Thread
public void returnConnectionToSelector(final NonBlockingConnection connection)
{
- if(selectionInterestRequiresUpdate(connection))
+ SelectionTask selectionTask = connection.getSelectionTask();
+ if(selectionTask == null)
+ {
+ throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
+ }
+
+ if (selectionInterestRequiresUpdate(connection) || connection.getTicker().getModified())
{
- SelectionTask selectionTask = connection.getSelectionTask();
- if(selectionTask == null)
- {
- throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
- }
selectionTask.getUnregisteredConnections().add(connection);
selectionTask.wakeup();
}
@@ -698,10 +701,5 @@ class SelectorThread extends Thread
{
_workQueue.add(new ConnectionProcessor(_scheduler, connection));
}
- SelectionTask selectionTask = connection.getSelectionTask();
- if (selectionTask != null)
- {
- selectionTask.wakeup();
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org