You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/31 14:36:25 UTC

svn commit: r1635768 - in /qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server: consumer/AbstractConsumerTarget.java queue/AbstractQueue.java queue/SubFlushRunner.java

Author: rgodfrey
Date: Fri Oct 31 13:36:25 2014
New Revision: 1635768

URL: http://svn.apache.org/r1635768
Log:
QPID-6204 : [Java Broker] Improve distribution fairness for multi-queue consumers

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Fri Oct 31 13:36:25 2014
@@ -20,8 +20,11 @@
  */
 package org.apache.qpid.server.consumer;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +40,7 @@ public abstract class AbstractConsumerTa
             CopyOnWriteArraySet<>();
 
     private final Lock _stateChangeLock = new ReentrantLock();
+    private final AtomicInteger _stateActivates = new AtomicInteger();
 
 
     protected AbstractConsumerTarget(final State initialState)
@@ -54,9 +58,40 @@ public abstract class AbstractConsumerTa
     {
         if(_state.compareAndSet(from, to))
         {
-            for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+            if (to == State.ACTIVE && _stateChangeListeners.size() > 1)
             {
-                listener.stateChanged(this, from, to);
+                int offset = _stateActivates.incrementAndGet();
+                if (offset >= _stateChangeListeners.size())
+                {
+                    _stateActivates.set(0);
+                    offset = 0;
+                }
+
+                List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>();
+                int pos = 0;
+                for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+                {
+                    if (pos++ < offset)
+                    {
+                        holdovers.add(listener);
+                    }
+                    else
+                    {
+                        listener.stateChanged(this, from, to);
+                    }
+                }
+                for (StateChangeListener<ConsumerTarget, State> listener : holdovers)
+                {
+                    listener.stateChanged(this, from, to);
+                }
+
+            }
+            else
+            {
+                for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+                {
+                    listener.stateChanged(this, from, to);
+                }
             }
             return true;
         }
@@ -68,6 +103,7 @@ public abstract class AbstractConsumerTa
 
     public final void notifyCurrentState()
     {
+
         for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
         {
             State state = getState();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Oct 31 13:36:25 2014
@@ -191,7 +191,7 @@ public abstract class AbstractQueue<X ex
             Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
 
 
-    static final int MAX_ASYNC_DELIVERIES = 80;
+    private int _maxAsyncDeliveries;
 
 
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
@@ -445,6 +445,7 @@ public abstract class AbstractQueue<X ex
             _messageGroupManager = null;
         }
 
+        _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
         updateAlertChecks();
     }
 
@@ -1536,7 +1537,6 @@ public abstract class AbstractQueue<X ex
         return _virtualHost.getEventLogger();
     }
 
-
     public static interface QueueEntryFilter
     {
         public boolean accept(QueueEntry entry);
@@ -1892,7 +1892,7 @@ public abstract class AbstractQueue<X ex
     boolean flushConsumer(QueueConsumer<?> sub, long iterations)
     {
         boolean atTail = false;
-        final boolean keepSendLockHeld = iterations <=  AbstractQueue.MAX_ASYNC_DELIVERIES;
+        final boolean keepSendLockHeld = iterations <=  getMaxAsyncDeliveries();
         boolean queueEmpty = false;
 
         try
@@ -2128,7 +2128,7 @@ public abstract class AbstractQueue<X ex
         boolean deliveryIncomplete = true;
 
         boolean lastLoop = false;
-        int iterations = MAX_ASYNC_DELIVERIES;
+        int iterations = getMaxAsyncDeliveries();
 
         final int numSubs = _consumerList.size();
 
@@ -2956,6 +2956,13 @@ public abstract class AbstractQueue<X ex
         _virtualHost.getSecurityManager().authoriseUpdate(this);
     }
 
+    int getMaxAsyncDeliveries()
+    {
+        return _maxAsyncDeliveries;
+    }
+
+
+
     private static final String[] NON_NEGATIVE_NUMBERS = {
         ALERT_REPEAT_GAP,
         ALERT_THRESHOLD_MESSAGE_AGE,

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Oct 31 13:36:25 2014
@@ -49,7 +49,6 @@ class SubFlushRunner implements Runnable
     private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
 
 
-    private static final long ITERATIONS = AbstractQueue.MAX_ASYNC_DELIVERIES;
     private final AtomicBoolean _stateChange = new AtomicBoolean();
 
     public SubFlushRunner(QueueConsumerImpl sub)
@@ -70,7 +69,7 @@ class SubFlushRunner implements Runnable
                     _stateChange.set(false);
                     try
                     {
-                        complete = getQueue().flushConsumer(_sub, ITERATIONS);
+                        complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries());
                     }
                     catch (ConnectionScopedRuntimeException | TransportException  e)
                     {
@@ -102,9 +101,9 @@ class SubFlushRunner implements Runnable
         }
     }
 
-    private AbstractQueue getQueue()
+    private AbstractQueue<?> getQueue()
     {
-        return (AbstractQueue) _sub.getQueue();
+        return (AbstractQueue<?>) _sub.getQueue();
     }
 
     public String toString()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org