You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/31 14:36:25 UTC
svn commit: r1635768 - in
/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server:
consumer/AbstractConsumerTarget.java queue/AbstractQueue.java
queue/SubFlushRunner.java
Author: rgodfrey
Date: Fri Oct 31 13:36:25 2014
New Revision: 1635768
URL: http://svn.apache.org/r1635768
Log:
QPID-6204 : [Java Broker] Improve distribution fairness for multi-queue consumers
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Fri Oct 31 13:36:25 2014
@@ -20,8 +20,11 @@
*/
package org.apache.qpid.server.consumer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +40,7 @@ public abstract class AbstractConsumerTa
CopyOnWriteArraySet<>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private final AtomicInteger _stateActivates = new AtomicInteger();
protected AbstractConsumerTarget(final State initialState)
@@ -54,9 +58,40 @@ public abstract class AbstractConsumerTa
{
if(_state.compareAndSet(from, to))
{
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ if (to == State.ACTIVE && _stateChangeListeners.size() > 1)
{
- listener.stateChanged(this, from, to);
+ int offset = _stateActivates.incrementAndGet();
+ if (offset >= _stateChangeListeners.size())
+ {
+ _stateActivates.set(0);
+ offset = 0;
+ }
+
+ List<StateChangeListener<ConsumerTarget, State>> holdovers = new ArrayList<>();
+ int pos = 0;
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ if (pos++ < offset)
+ {
+ holdovers.add(listener);
+ }
+ else
+ {
+ listener.stateChanged(this, from, to);
+ }
+ }
+ for (StateChangeListener<ConsumerTarget, State> listener : holdovers)
+ {
+ listener.stateChanged(this, from, to);
+ }
+
+ }
+ else
+ {
+ for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
+ {
+ listener.stateChanged(this, from, to);
+ }
}
return true;
}
@@ -68,6 +103,7 @@ public abstract class AbstractConsumerTa
public final void notifyCurrentState()
{
+
for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
{
State state = getState();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Oct 31 13:36:25 2014
@@ -191,7 +191,7 @@ public abstract class AbstractQueue<X ex
Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
- static final int MAX_ASYNC_DELIVERIES = 80;
+ private int _maxAsyncDeliveries;
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
@@ -445,6 +445,7 @@ public abstract class AbstractQueue<X ex
_messageGroupManager = null;
}
+ _maxAsyncDeliveries = getContextValue(Integer.class, Queue.MAX_ASYNCHRONOUS_DELIVERIES);
updateAlertChecks();
}
@@ -1536,7 +1537,6 @@ public abstract class AbstractQueue<X ex
return _virtualHost.getEventLogger();
}
-
public static interface QueueEntryFilter
{
public boolean accept(QueueEntry entry);
@@ -1892,7 +1892,7 @@ public abstract class AbstractQueue<X ex
boolean flushConsumer(QueueConsumer<?> sub, long iterations)
{
boolean atTail = false;
- final boolean keepSendLockHeld = iterations <= AbstractQueue.MAX_ASYNC_DELIVERIES;
+ final boolean keepSendLockHeld = iterations <= getMaxAsyncDeliveries();
boolean queueEmpty = false;
try
@@ -2128,7 +2128,7 @@ public abstract class AbstractQueue<X ex
boolean deliveryIncomplete = true;
boolean lastLoop = false;
- int iterations = MAX_ASYNC_DELIVERIES;
+ int iterations = getMaxAsyncDeliveries();
final int numSubs = _consumerList.size();
@@ -2956,6 +2956,13 @@ public abstract class AbstractQueue<X ex
_virtualHost.getSecurityManager().authoriseUpdate(this);
}
+ int getMaxAsyncDeliveries()
+ {
+ return _maxAsyncDeliveries;
+ }
+
+
+
private static final String[] NON_NEGATIVE_NUMBERS = {
ALERT_REPEAT_GAP,
ALERT_THRESHOLD_MESSAGE_AGE,
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1635768&r1=1635767&r2=1635768&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Oct 31 13:36:25 2014
@@ -49,7 +49,6 @@ class SubFlushRunner implements Runnable
private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
- private static final long ITERATIONS = AbstractQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
public SubFlushRunner(QueueConsumerImpl sub)
@@ -70,7 +69,7 @@ class SubFlushRunner implements Runnable
_stateChange.set(false);
try
{
- complete = getQueue().flushConsumer(_sub, ITERATIONS);
+ complete = getQueue().flushConsumer(_sub, getQueue().getMaxAsyncDeliveries());
}
catch (ConnectionScopedRuntimeException | TransportException e)
{
@@ -102,9 +101,9 @@ class SubFlushRunner implements Runnable
}
}
- private AbstractQueue getQueue()
+ private AbstractQueue<?> getQueue()
{
- return (AbstractQueue) _sub.getQueue();
+ return (AbstractQueue<?>) _sub.getQueue();
}
public String toString()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org