You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/11/17 14:30:14 UTC

svn commit: r1770199 - in /qpid/java/branches/6.1.x: ./ broker-core/src/main/java/org/apache/qpid/server/transport/

Author: orudyy
Date: Thu Nov 17 14:30:14 2016
New Revision: 1770199

URL: http://svn.apache.org/viewvc?rev=1770199&view=rev
Log:
QPID-7508: [Java Broker] Ensure that returning connection to the selectors pays attention to a change in ticker state

merged from trunk using
svn merge -c 1769597 ^/qpid/java/trunk

Modified:
    qpid/java/branches/6.1.x/   (props changed)
    qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
    qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Propchange: qpid/java/branches/6.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 17 14:30:14 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139
+/qpid/java/trunk:1766544,1766547,1766553,1766666,1766796-1766797,1766806,1767251,1767267-1767268,1767275,1767310,1767326,1767329,1767332,1767514,1767523,1767738,1767825,1767847-1767849,1767882,1767909,1767914,1768016-1768017,1768065,1768643,1768704,1768854,1768875,1768914,1768963,1768967,1768976,1769007,1769009,1769087,1769138-1769139,1769597
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java?rev=1770199&r1=1770198&r2=1770199&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java (original)
+++ qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java Thu Nov 17 14:30:14 2016
@@ -20,6 +20,7 @@
 package org.apache.qpid.server.transport;
 
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.transport.network.Ticker;
 
@@ -27,6 +28,7 @@ public class AggregateTicker implements
 {
 
     private final CopyOnWriteArrayList<Ticker> _tickers = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean _modified = new AtomicBoolean();
 
     @Override
     public int getTimeToNextTick(final long currentTime)
@@ -58,19 +60,26 @@ public class AggregateTicker implements
         return nextTick;
     }
 
-    public CopyOnWriteArrayList<Ticker> getTickers()
-    {
-        return _tickers;
-    }
-
     public void addTicker(Ticker ticker)
     {
         _tickers.add(ticker);
+        _modified.set(true);
     }
 
     public void removeTicker(Ticker ticker)
     {
         _tickers.remove(ticker);
+        _modified.set(true);
+    }
+
+    public boolean getModified()
+    {
+        return _modified.get();
+    }
+
+    public void resetModified()
+    {
+        _modified.set(false);
     }
 
     @Override

Modified: qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1770199&r1=1770198&r2=1770199&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov 17 14:30:14 2016
@@ -133,7 +133,7 @@ public class NonBlockingConnection imple
         return _partialRead;
     }
 
-    Ticker getTicker()
+    AggregateTicker getTicker()
     {
         return _protocolEngine.getAggregateTicker();
     }

Modified: qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1770199&r1=1770198&r2=1770199&view=diff
==============================================================================
--- qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/6.1.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Nov 17 14:30:14 2016
@@ -138,7 +138,9 @@ class SelectorThread extends Thread
             {
                 NonBlockingConnection connection = iterator.next();
 
-                int period = connection.getTicker().getTimeToNextTick(currentTime);
+                final AggregateTicker ticker = connection.getTicker();
+                int period = ticker.getTimeToNextTick(currentTime);
+                ticker.resetModified();
 
                 if (period <= 0 || connection.isStateChanged())
                 {
@@ -616,13 +618,14 @@ class SelectorThread extends Thread
 
     public void returnConnectionToSelector(final NonBlockingConnection connection)
     {
-        if(selectionInterestRequiresUpdate(connection))
+        SelectionTask selectionTask = connection.getSelectionTask();
+        if(selectionTask == null)
+        {
+            throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
+        }
+
+        if (selectionInterestRequiresUpdate(connection) || connection.getTicker().getModified())
         {
-            SelectionTask selectionTask = connection.getSelectionTask();
-            if(selectionTask == null)
-            {
-                throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
-            }
             selectionTask.getUnregisteredConnections().add(connection);
             selectionTask.wakeup();
         }
@@ -698,10 +701,5 @@ class SelectorThread extends Thread
          {
              _workQueue.add(new ConnectionProcessor(_scheduler, connection));
          }
-         SelectionTask selectionTask = connection.getSelectionTask();
-         if (selectionTask != null)
-         {
-             selectionTask.wakeup();
-         }
      }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org