You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/10 16:27:53 UTC
svn commit: r1769148 - in /qpid/java/branches/remove-queue-runner:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/...
Author: lquack
Date: Thu Nov 10 16:27:52 2016
New Revision: 1769148
URL: http://svn.apache.org/viewvc?rev=1769148&view=rev
Log:
remove ConsumerTarget StateChangeListeners
Removed:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -23,10 +23,7 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Futures;
@@ -42,17 +39,20 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.StateChangeListener;
-public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
+public abstract class AbstractConsumerTarget implements ConsumerTarget
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
- private final AtomicReference<State> _state;
-
- private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
- CopyOnWriteArraySet<>();
+ private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[(** Multi-Queue **)] ";
+ }
+ };
+ private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
- private final AtomicInteger _stateActivates = new AtomicInteger();
private final boolean _isMultiQueue;
private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
@@ -60,23 +60,31 @@ public abstract class AbstractConsumerTa
private Iterator<ConsumerImpl> _pullIterator;
private boolean _notifyWorkDesired;
- protected AbstractConsumerTarget(final State initialState,
- final boolean isMultiQueue,
+ protected AbstractConsumerTarget(final boolean isMultiQueue,
final AMQPConnection<?> amqpConnection)
{
- _state = new AtomicReference<State>(initialState);
_isMultiQueue = isMultiQueue;
- _suspendedConsumerLoggingTicker = isMultiQueue
- ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
- {
- @Override
- protected void log(final long period)
- {
- amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
- }
- }
- : null;
+ _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+ {
+ @Override
+ protected void log(final long period)
+ {
+ amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
+ }
+ };
+ }
+
+ private LogSubject getLogSubject()
+ {
+ if (_consumers.size() == 1 && _consumers.get(0) instanceof LogSubject)
+ {
+ return (LogSubject) _consumers.get(0);
+ }
+ else
+ {
+ return MULTI_QUEUE_LOG_SUBJECT;
+ }
}
public boolean isMultiQueue()
@@ -94,16 +102,20 @@ public abstract class AbstractConsumerTa
{
if (desired != _notifyWorkDesired)
{
- // TODO - remove once queue is smarter
- if (desired)
- {
- updateState(State.SUSPENDED, State.ACTIVE);
- }
- else
+ if(_suspendedConsumerLoggingTicker != null)
{
- updateState(State.ACTIVE, State.SUSPENDED);
+ if (desired)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ }
+ else
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ }
}
+ // TODO - remove once queue is smarter
for (ConsumerImpl consumer : _consumers)
{
consumer.setNotifyWorkDesired(desired);
@@ -193,58 +205,6 @@ public abstract class AbstractConsumerTa
return _state.get();
}
- protected final boolean updateState(State from, State to)
- {
- if(_state.compareAndSet(from, to))
- {
- if(!_stateChangeListeners.isEmpty())
- {
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
- {
- listener.stateChanged(this, from, to);
- }
- }
- if(_suspendedConsumerLoggingTicker != null)
- {
- if (to == State.SUSPENDED)
- {
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- }
- return true;
- }
- else
- {
- return false;
- }
- }
-
- @Override
- public final void notifyCurrentState()
- {
-
- for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
- {
- State state = getState();
- listener.stateChanged(this, state, state);
- }
- }
- public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
- {
- _stateChangeListeners.add(listener);
- }
-
- @Override
- public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
- {
- _stateChangeListeners.remove(listener);
- }
-
@Override
public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
@@ -311,38 +271,27 @@ public abstract class AbstractConsumerTa
final public boolean close()
{
- boolean closed = false;
- State state = getState();
- List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
- _consumers.clear();
-
- while(!closed && state != State.CLOSED)
+ if (_state.compareAndSet(State.OPEN, State.CLOSED))
{
- closed = updateState(state, State.CLOSED);
- if(!closed)
+ List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+ _consumers.clear();
+
+ setNotifyWorkDesired(false);
+
+ for (ConsumerImpl consumer : consumers)
{
- state = getState();
+ consumer.close();
+ }
+ if (_suspendedConsumerLoggingTicker != null)
+ {
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
- }
- setNotifyWorkDesired(false);
- for (ConsumerImpl consumer : consumers)
- {
- consumer.close();
+ return true;
}
- if(_suspendedConsumerLoggingTicker != null)
+ else
{
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+ return false;
}
-
- return closed;
-
- }
-
- @Override
- public String toLogString()
- {
-
- return "[(** Multi-Queue **)] ";
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -25,16 +25,11 @@ import com.google.common.util.concurrent
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
public interface ConsumerTarget
{
-
-
void acquisitionRemoved(MessageInstance node);
- void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
boolean processPending();
String getTargetAddress();
@@ -49,7 +44,7 @@ public interface ConsumerTarget
enum State
{
- ACTIVE, SUSPENDED, CLOSED
+ OPEN, CLOSED
}
State getState();
@@ -58,10 +53,6 @@ public interface ConsumerTarget
ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
- void notifyCurrentState();
-
- void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
-
long getUnacknowledgedBytes();
long getUnacknowledgedMessages();
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov 10 16:27:52 2016
@@ -294,6 +294,16 @@ public abstract class AbstractQueue<X ex
else
{
_activeSubscriberCount.decrementAndGet();
+
+ final ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
+ {
+ final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+ if (s != null && s.getPriority() < consumer.getPriority())
+ {
+ s.notifyWork();
+ }
+ }
}
}
@@ -2035,10 +2045,6 @@ public abstract class AbstractQueue<X ex
{
if(getNextAvailableEntry(consumer) != null && consumer.isNotifyWorkDesired())
{
- final ConsumerTarget target = consumer.getTarget();
- // if the higher priority consumer later becomes suspended we should try notifying this
- // consumer again
- target.addStateListener(new BlockedOnHigherPriorityListener(sub, target));
return false;
}
}
@@ -2273,52 +2279,6 @@ public abstract class AbstractQueue<X ex
}
}
- private static class BlockedOnHigherPriorityListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
- {
- private final QueueConsumer<?> _sub;
- private final ConsumerTarget _target;
-
- public BlockedOnHigherPriorityListener(final QueueConsumer<?> sub, final ConsumerTarget target)
- {
- _sub = sub;
- _target = target;
- }
-
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if(newState != ConsumerTarget.State.ACTIVE)
- {
- _sub.notifyWork();
- _target.removeStateChangeListener(this);
- }
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o)
- {
- return true;
- }
- if (o == null || getClass() != o.getClass())
- {
- return false;
- }
- final BlockedOnHigherPriorityListener that = (BlockedOnHigherPriorityListener) o;
- return Objects.equals(_sub, that._sub) &&
- Objects.equals(_target, that._target);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(_sub, _target);
- }
- }
-
abstract class BaseMessageContent implements Content, CustomRestHeaders
{
public static final int UNLIMITED = -1;
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov 10 16:27:52 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
import java.text.MessageFormat;
-import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -53,11 +52,9 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
@@ -68,7 +65,6 @@ class QueueConsumerImpl
implements QueueConsumer<QueueConsumerImpl>, LogSubject
{
private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
- private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final long _consumerNumber;
private final long _createTime = System.currentTimeMillis();
@@ -85,28 +81,9 @@ class QueueConsumerImpl
private final Object _sessionReference;
private final AbstractQueue _queue;
- private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-
- static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
- new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
-
- static
- {
- STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
- STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
- STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
- }
-
private final ConsumerTarget _target;
- private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
private volatile QueueContext _queueContext;
- private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
- {
- public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
- {
- // no-op
- }
- };
+
@ManagedAttributeField
private boolean _exclusive;
@ManagedAttributeField
@@ -146,29 +123,6 @@ class QueueConsumerImpl
open();
setupLogging();
-
- _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- targetStateChanged(oldState, newState);
- }
- };
- _target.addStateListener(_listener);
-
- _suspendedConsumerLoggingTicker = target.isMultiQueue()
- ? null
- : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
- {
- @Override
- protected void log(final long period)
- {
- getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
- }
- };
}
private static Map<String, Object> createAttributeMap(String name,
@@ -205,43 +159,6 @@ class QueueConsumerImpl
return attributes;
}
- private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
- {
- if(oldState != newState)
- {
- if(newState == ConsumerTarget.State.CLOSED)
- {
- if(_targetClosed.compareAndSet(false,true))
- {
- getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
- }
- }
-
- if(_suspendedConsumerLoggingTicker != null)
- {
- if (newState == ConsumerTarget.State.SUSPENDED)
- {
- _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
- getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
- }
- else
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
- }
- }
-
- if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
- {
- closeAsync();
- }
- final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
- if(stateListener != null)
- {
- stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
- }
- }
-
@Override
public ConsumerTarget getTarget()
{
@@ -301,6 +218,8 @@ class QueueConsumerImpl
{
if(_closed.compareAndSet(false,true))
{
+ getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
+
_waitingOnCreditMessageListener.remove();
return doAfter(_target.consumerRemoved(this),
@@ -309,14 +228,8 @@ class QueueConsumerImpl
@Override
public void run()
{
- _target.removeStateChangeListener(_listener);
-
_queue.unregisterConsumer(QueueConsumerImpl.this);
- if (_suspendedConsumerLoggingTicker != null)
- {
- getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
- }
deleted();
}
});
@@ -365,12 +278,6 @@ class QueueConsumerImpl
}
@Override
- public State getState()
- {
- return STATE_MAP.get(_target.getState());
- }
-
- @Override
public final Queue<?> getQueue()
{
return _queue;
@@ -412,16 +319,6 @@ class QueueConsumerImpl
return _consumerNumber;
}
- public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
- {
- return _stateListener;
- }
-
- public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
- {
- _stateListener = listener;
- }
-
public final QueueContext getQueueContext()
{
return _queueContext;
@@ -434,12 +331,12 @@ class QueueConsumerImpl
public final boolean isActive()
{
- return getState() == State.ACTIVE;
+ return _target.getState() == ConsumerTarget.State.OPEN;
}
public final boolean isClosed()
{
- return getState() == State.DELETED;
+ return _target.getState() == ConsumerTarget.State.CLOSED;
}
public final boolean hasInterest(QueueEntry entry)
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov 10 16:27:52 2016
@@ -122,22 +122,6 @@ public abstract class AbstractSystemMess
{
_name = consumerName;
_target = target;
- _target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if(newState == ConsumerTarget.State.ACTIVE)
- {
- if(!_queue.isEmpty())
- {
- _target.notifyWork();
- }
- }
- }
- });
}
@Override
@@ -263,7 +247,6 @@ public abstract class AbstractSystemMess
return _name;
}
-
public void send(final InternalMessage response)
{
_queue.add(new PropertiesMessageInstance(this, response));
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -26,10 +26,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -51,44 +48,25 @@ import org.apache.qpid.server.protocol.C
import org.apache.qpid.server.queue.AbstractQueue;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.transport.network.Ticker;
public class TestConsumerTarget implements ConsumerTarget
{
- private final List<String> _messageIds;
private boolean _closed = false;
private String tag = "mocktag";
private Queue<?> queue = null;
- private StateChangeListener<ConsumerTarget, State> _listener = null;
- private State _state = State.ACTIVE;
+ private State _state = State.OPEN;
private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
- private final Lock _stateChangeLock = new ReentrantLock();
private boolean _isActive = true;
private ConsumerImpl _consumer;
- private boolean _messageSent;
private MockSessionModel _sessionModel = new MockSessionModel();
private boolean _notifyDesired;
- public TestConsumerTarget()
- {
- _messageIds = null;
- }
-
- public TestConsumerTarget(List<String> messageIds)
- {
- _messageIds = messageIds;
- }
-
public boolean close()
{
_closed = true;
- if (_listener != null)
- {
- _listener.stateChanged(this, _state, State.CLOSED);
- }
_state = State.CLOSED;
updateNotifyWorkDesired();
return true;
@@ -143,7 +121,6 @@ public class TestConsumerTarget implemen
public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
{
- _messageSent = true;
long size = entry.getMessage().getSize();
if (_messages.contains(entry))
{
@@ -194,39 +171,13 @@ public class TestConsumerTarget implemen
return Futures.immediateFuture(null);
}
- @Override
- public void notifyCurrentState()
- {
-
- }
-
public void setState(State state)
{
- State oldState = _state;
_state = state;
- if(_listener != null)
- {
- _listener.stateChanged(this, oldState, state);
- }
updateNotifyWorkDesired();
}
@Override
- public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
- {
- _listener = listener;
- }
-
- @Override
- public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
- {
- if(_listener == listener)
- {
- _listener = null;
- }
- }
-
- @Override
public boolean processPending()
{
AbstractQueue.MessageContainer messageContainer = _consumer.pullMessage();
@@ -276,7 +227,7 @@ public class TestConsumerTarget implemen
@Override
public boolean isNotifyWorkDesired()
{
- return _state == State.ACTIVE;
+ return _state == State.OPEN;
}
@Override
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Nov 10 16:27:52 2016
@@ -65,74 +65,6 @@ public class StandardQueueTest extends A
getQueue().isDeleted());
}
- public void testActiveConsumerCount() throws Exception
- {
-
- Map<String,Object> queueAttributes = new HashMap<>();
- queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
- queueAttributes.put(Queue.OWNER, "testOwner");
- final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
- queue.open();
- //verify adding an active consumer increases the count
- final TestConsumerTarget consumer1 = new TestConsumerTarget();
- consumer1.setActive(true);
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
- queue.addConsumer(consumer1,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify adding an inactive consumer doesn't increase the count
- final TestConsumerTarget consumer2 = new TestConsumerTarget();
- consumer2.setActive(false);
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
- queue.addConsumer(consumer2,
- null,
- createMessage(-1l).getClass(),
- "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES), 0);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify behaviour in face of expected state changes:
-
- //verify a consumer going suspended->active increases the count
- consumer2.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->suspended decreases the count
- consumer2.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going suspended->closed doesn't change the count
- consumer2.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->active doesn't change the count
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- //verify a consumer going suspended->suspended doesn't change the count
- consumer1.setState(ConsumerTarget.State.SUSPENDED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- consumer1.setState(ConsumerTarget.State.ACTIVE);
- assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
- //verify a consumer going active->closed decreases the count
- consumer1.setState(ConsumerTarget.State.CLOSED);
- assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
- }
-
/**
* Tests that entry in dequeued state are not enqueued and not delivered to consumer
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov 10 16:27:52 2016
@@ -114,7 +114,7 @@ public class ConsumerTarget_0_10 extends
Map<String, Object> arguments,
boolean multiQueue)
{
- super(State.SUSPENDED, multiQueue, session.getAMQPConnection());
+ super(multiQueue, session.getAMQPConnection());
_session = session;
_postIdSettingAction = new AddMessageDispositionListenerAction(session);
_acceptMode = acceptMode;
@@ -496,12 +496,8 @@ public class ConsumerTarget_0_10 extends
public boolean allocateCredit(ServerMessage message)
{
- boolean hasCredit = _creditManager.hasCredit();
boolean creditAllocated = _creditManager.useCreditForMessage(message.getSize());
- if(_creditManager.hasCredit() != hasCredit)
- {
- updateNotifyWorkDesired();
- }
+ updateNotifyWorkDesired();
return creditAllocated;
}
@@ -512,12 +508,8 @@ public class ConsumerTarget_0_10 extends
void restoreCredit(int count, long size)
{
- boolean hasCredit = _creditManager.hasCredit();
_creditManager.restoreCredit(count, size);
- if(_creditManager.hasCredit() != hasCredit)
- {
- updateNotifyWorkDesired();
- }
+ updateNotifyWorkDesired();
}
@@ -529,19 +521,13 @@ public class ConsumerTarget_0_10 extends
public void stop()
{
- FlowCreditManager_0_10 creditManager = getCreditManager();
- boolean hasCredit = creditManager.hasCredit();
- creditManager.clearCredit();
- if(hasCredit)
- {
- updateNotifyWorkDesired();
- }
+ getCreditManager().clearCredit();
+ updateNotifyWorkDesired();
}
public void addCredit(MessageCreditUnit unit, long value)
{
FlowCreditManager_0_10 creditManager = getCreditManager();
- boolean hasCredit = creditManager.hasCredit();
switch (unit)
{
case MESSAGE:
@@ -552,17 +538,7 @@ public class ConsumerTarget_0_10 extends
creditManager.addCredit(0l, value);
break;
}
-
- boolean newHasCredit = creditManager.hasCredit();
- if(newHasCredit)
- {
- updateState(State.SUSPENDED, State.ACTIVE);
- }
- if(hasCredit != newHasCredit)
- {
- updateNotifyWorkDesired();
- }
-
+ updateNotifyWorkDesired();
}
public void setFlowMode(MessageFlowMode flowMode)
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Nov 10 16:27:52 2016
@@ -298,7 +298,7 @@ public abstract class ConsumerTarget_0_8
RecordDeliveryMethod recordMethod,
boolean multiQueue)
{
- super(State.ACTIVE, multiQueue, channel.getAMQPConnection());
+ super(multiQueue, channel.getAMQPConnection());
_channel = channel;
_consumerTag = consumerTag;
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov 10 16:27:52 2016
@@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends Abstrac
public ConsumerTarget_1_0(final SendingLink_1_0 link,
boolean acquires)
{
- super(State.SUSPENDED, false, link.getSession().getAMQPConnection());
+ super(false, link.getSession().getAMQPConnection());
_link = link;
_typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
_sectionEncoder = new SectionEncoderImpl(_typeRegistry);
@@ -276,10 +276,8 @@ class ConsumerTarget_1_0 extends Abstrac
ProtocolEngine protocolEngine = getSession().getConnection();
final boolean hasCredit =
_link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
- if (!hasCredit && getState() == State.ACTIVE)
- {
- suspend();
- }
+
+ updateNotifyWorkDesired();
if (hasCredit)
{
@@ -291,25 +289,19 @@ class ConsumerTarget_1_0 extends Abstrac
}
- public void suspend()
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
-
-
public void restoreCredit(final ServerMessage message)
{
final SendingLinkEndpoint endpoint = _link.getEndpoint();
endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
+ updateNotifyWorkDesired();
}
public void queueEmpty()
{
if(_link.drained())
{
- updateState(State.ACTIVE, State.SUSPENDED);
+ updateNotifyWorkDesired();
}
-
}
public void flowStateChanged()
@@ -319,7 +311,6 @@ class ConsumerTarget_1_0 extends Abstrac
ProtocolEngine protocolEngine = getSession().getConnection();
if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
- updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Nov 10 16:27:52 2016
@@ -588,12 +588,6 @@ public class SendingLink_1_0 implements
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
-
- if(_consumer.isActive())
- {
- _target.suspend();
- }
-
_linkAttachment = linkAttachment;
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Nov 10 16:27:52 2016
@@ -989,19 +989,6 @@ class ManagementNode implements MessageS
final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
target.consumerAdded(managementNodeConsumer);
_consumers.add(managementNodeConsumer);
- target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
- {
- @Override
- public void stateChanged(final ConsumerTarget object,
- final ConsumerTarget.State oldState,
- final ConsumerTarget.State newState)
- {
- if(newState == ConsumerTarget.State.CLOSED)
- {
- _consumers.remove(managementNodeConsumer);
- }
- }
- });
return managementNodeConsumer;
}
@@ -1049,6 +1036,11 @@ class ManagementNode implements MessageS
return MessageDurability.NEVER;
}
+ void unregisterConsumer(ManagementNodeConsumer managementNodeConsumer)
+ {
+ _consumers.remove(managementNodeConsumer);
+ }
+
private class ConsumedMessageInstance implements MessageInstance
{
private final ServerMessage _message;
Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov 10 16:27:52 2016
@@ -161,6 +161,7 @@ class ManagementNodeConsumer implements
@Override
public void close()
{
+ _managementNode.unregisterConsumer(this);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org