You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/11/17 14:47:19 UTC

svn commit: r1770200 - in /qpid/java/branches/6.0.x: ./ broker-core/src/main/java/org/apache/qpid/server/transport/ common/src/main/java/org/apache/qpid/transport/network/

Author: orudyy
Date: Thu Nov 17 14:47:19 2016
New Revision: 1770200

URL: http://svn.apache.org/viewvc?rev=1770200&view=rev
Log:
QPID-7508: [Java Broker] Ensure that returning connection to the selectors pays attention to a change in ticker state

partially merged from 6.1.x using
svn merge -c 1769597 ^/qpid/java/trunk
ported only changes to ensure that returning connection is re-scheduled on ticker change

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Nov 17 14:47:19 2016
@@ -9,6 +9,6 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/branches/6.1.x:1767487
-/qpid/java/trunk
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1758980,1759774,1759783,1760032,1760337,1760522,1760546,1763653,1763966,1763988,1765350,1765609,1765828,1766032,1766547,1766553,1766796
+/qpid/java/branches/6.1.x:1767487,1770199
+/qpid/java/trunk
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732465,1732525,1732812,1733467,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731,1738914,1741702,1742257,1742284,1742339,1742544,1742900,1742926,1743161,1743228,1743383,1743982,1744012-1744013,1744046,1744123,1744157,1744276,1744403,1745424,1745450,1746140,1746273,1747526,1748254,1748723,1748818,1749349,1749399,1749482,1749524,1750359-1750360,1750943,1751433,1754251,1754354,1754392,1754429,1754510,1754550,1755561,1755957,1758628,1758640,1758766,1758964,1758980,1759774,1759783,1760032,1760337,1760522,1760546,1763653,1763966,1763988,1765350,1765609,1765828,1766032,1766547,1766553,1766796,1769597
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1770200&r1=1770199&r2=1770200&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Nov 17 14:47:19 2016
@@ -44,6 +44,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.util.SystemUtils;
@@ -130,7 +131,7 @@ public class NonBlockingConnection imple
         return _partialRead;
     }
 
-    Ticker getTicker()
+    AggregateTicker getTicker()
     {
         return _protocolEngine.getAggregateTicker();
     }

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1770200&r1=1770199&r2=1770200&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Nov 17 14:47:19 2016
@@ -43,6 +43,8 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.transport.network.AggregateTicker;
+
 
 class SelectorThread extends Thread
 {
@@ -128,7 +130,9 @@ class SelectorThread extends Thread
             {
                 NonBlockingConnection connection = iterator.next();
 
-                int period = connection.getTicker().getTimeToNextTick(currentTime);
+                final AggregateTicker ticker = connection.getTicker();
+                int period = ticker.getTimeToNextTick(currentTime);
+                ticker.resetModified();
 
                 if (period <= 0 || connection.isStateChanged())
                 {
@@ -559,13 +563,14 @@ class SelectorThread extends Thread
 
     public void returnConnectionToSelector(final NonBlockingConnection connection)
     {
-        if(selectionInterestRequiresUpdate(connection))
+        SelectionTask selectionTask = connection.getSelectionTask();
+        if(selectionTask == null)
+        {
+            throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
+        }
+
+        if (selectionInterestRequiresUpdate(connection) || connection.getTicker().getModified())
         {
-            SelectionTask selectionTask = connection.getSelectionTask();
-            if(selectionTask == null)
-            {
-                throw new IllegalStateException("returnConnectionToSelector should only be called with connections that are currently assigned a selector task");
-            }
             selectionTask.getUnregisteredConnections().add(connection);
             selectionTask.wakeup();
         }

Modified: qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java?rev=1770200&r1=1770199&r2=1770200&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java (original)
+++ qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java Thu Nov 17 14:47:19 2016
@@ -21,11 +21,13 @@
 package org.apache.qpid.transport.network;
 
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class AggregateTicker implements Ticker
 {
 
     private final CopyOnWriteArrayList<Ticker> _tickers = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean _modified = new AtomicBoolean();
 
     @Override
     public int getTimeToNextTick(final long currentTime)
@@ -57,10 +59,22 @@ public class AggregateTicker implements
     public void addTicker(Ticker ticker)
     {
         _tickers.add(ticker);
+        _modified.set(true);
     }
 
     public void removeTicker(Ticker ticker)
     {
         _tickers.remove(ticker);
+        _modified.set(true);
+    }
+
+    public boolean getModified()
+    {
+        return _modified.get();
+    }
+
+    public void resetModified()
+    {
+        _modified.set(false);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org