You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/10 16:27:53 UTC

svn commit: r1769148 - in /qpid/java/branches/remove-queue-runner: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/...

Author: lquack
Date: Thu Nov 10 16:27:52 2016
New Revision: 1769148

URL: http://svn.apache.org/viewvc?rev=1769148&view=rev
Log:
remove ConsumerTarget StateChangeListeners

Removed:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java
Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -23,10 +23,7 @@ package org.apache.qpid.server.consumer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Futures;
@@ -42,17 +39,20 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.StateChangeListener;
 
-public abstract class AbstractConsumerTarget implements ConsumerTarget, LogSubject
+public abstract class AbstractConsumerTarget implements ConsumerTarget
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
-    private final AtomicReference<State> _state;
-
-    private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
-            CopyOnWriteArraySet<>();
+    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
+    {
+        @Override
+        public String toLogString()
+        {
+            return "[(** Multi-Queue **)] ";
+        }
+    };
+    private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
 
-    private final AtomicInteger _stateActivates = new AtomicInteger();
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
@@ -60,23 +60,31 @@ public abstract class AbstractConsumerTa
     private Iterator<ConsumerImpl> _pullIterator;
     private boolean _notifyWorkDesired;
 
-    protected AbstractConsumerTarget(final State initialState,
-                                     final boolean isMultiQueue,
+    protected AbstractConsumerTarget(final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
     {
-        _state = new AtomicReference<State>(initialState);
         _isMultiQueue = isMultiQueue;
-        _suspendedConsumerLoggingTicker = isMultiQueue
-                ? new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
-                {
-                    @Override
-                    protected void log(final long period)
-                    {
-                        amqpConnection.getEventLogger().message(AbstractConsumerTarget.this, SubscriptionMessages.STATE(period));
-                    }
-                }
-                : null;
 
+        _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+        {
+            @Override
+            protected void log(final long period)
+            {
+                amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
+            }
+        };
+    }
+
+    private LogSubject getLogSubject()
+    {
+        if (_consumers.size() == 1 && _consumers.get(0) instanceof LogSubject)
+        {
+            return (LogSubject) _consumers.get(0);
+        }
+        else
+        {
+            return MULTI_QUEUE_LOG_SUBJECT;
+        }
     }
 
     public boolean isMultiQueue()
@@ -94,16 +102,20 @@ public abstract class AbstractConsumerTa
     {
         if (desired != _notifyWorkDesired)
         {
-            // TODO - remove once queue is smarter
-            if (desired)
-            {
-                updateState(State.SUSPENDED, State.ACTIVE);
-            }
-            else
+            if(_suspendedConsumerLoggingTicker != null)
             {
-                updateState(State.ACTIVE, State.SUSPENDED);
+                if (desired)
+                {
+                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+                }
+                else
+                {
+                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+                }
             }
 
+            // TODO - remove once queue is smarter
             for (ConsumerImpl consumer : _consumers)
             {
                 consumer.setNotifyWorkDesired(desired);
@@ -193,58 +205,6 @@ public abstract class AbstractConsumerTa
         return _state.get();
     }
 
-    protected final boolean updateState(State from, State to)
-    {
-        if(_state.compareAndSet(from, to))
-        {
-            if(!_stateChangeListeners.isEmpty())
-            {
-                for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
-                {
-                    listener.stateChanged(this, from, to);
-                }
-            }
-            if(_suspendedConsumerLoggingTicker != null)
-            {
-                if (to == State.SUSPENDED)
-                {
-                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-            }
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-    }
-
-    @Override
-    public final void notifyCurrentState()
-    {
-
-        for (StateChangeListener<ConsumerTarget, State> listener : _stateChangeListeners)
-        {
-            State state = getState();
-            listener.stateChanged(this, state, state);
-        }
-    }
-    public final void addStateListener(StateChangeListener<ConsumerTarget, State> listener)
-    {
-        _stateChangeListeners.add(listener);
-    }
-
-    @Override
-    public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
-    {
-        _stateChangeListeners.remove(listener);
-    }
-
     @Override
     public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
@@ -311,38 +271,27 @@ public abstract class AbstractConsumerTa
 
     final public boolean close()
     {
-        boolean closed = false;
-        State state = getState();
-        List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
-        _consumers.clear();
-
-        while(!closed && state != State.CLOSED)
+        if (_state.compareAndSet(State.OPEN, State.CLOSED))
         {
-            closed = updateState(state, State.CLOSED);
-            if(!closed)
+            List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+            _consumers.clear();
+
+            setNotifyWorkDesired(false);
+
+            for (ConsumerImpl consumer : consumers)
             {
-                state = getState();
+                consumer.close();
+            }
+            if (_suspendedConsumerLoggingTicker != null)
+            {
+                getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
             }
-        }
-        setNotifyWorkDesired(false);
 
-        for (ConsumerImpl consumer : consumers)
-        {
-            consumer.close();
+            return true;
         }
-        if(_suspendedConsumerLoggingTicker != null)
+        else
         {
-            getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
+            return false;
         }
-
-        return closed;
-
-    }
-
-    @Override
-    public String toLogString()
-    {
-
-        return "[(** Multi-Queue **)] ";
     }
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -25,16 +25,11 @@ import com.google.common.util.concurrent
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
 
 public interface ConsumerTarget
 {
-
-
     void acquisitionRemoved(MessageInstance node);
 
-    void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
     boolean processPending();
 
     String getTargetAddress();
@@ -49,7 +44,7 @@ public interface ConsumerTarget
 
     enum State
     {
-        ACTIVE, SUSPENDED, CLOSED
+        OPEN, CLOSED
     }
 
     State getState();
@@ -58,10 +53,6 @@ public interface ConsumerTarget
 
     ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
 
-    void notifyCurrentState();
-
-    void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
-
     long getUnacknowledgedBytes();
 
     long getUnacknowledgedMessages();

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Nov 10 16:27:52 2016
@@ -294,6 +294,16 @@ public abstract class AbstractQueue<X ex
         else
         {
             _activeSubscriberCount.decrementAndGet();
+
+            final ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+            while (consumerNodeIterator.advance())
+            {
+                final QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
+                if (s != null && s.getPriority() < consumer.getPriority())
+                {
+                    s.notifyWork();
+                }
+            }
         }
     }
 
@@ -2035,10 +2045,6 @@ public abstract class AbstractQueue<X ex
             {
                 if(getNextAvailableEntry(consumer) != null && consumer.isNotifyWorkDesired())
                 {
-                    final ConsumerTarget target = consumer.getTarget();
-                    // if the higher priority consumer later becomes suspended we should try notifying this
-                    // consumer again
-                    target.addStateListener(new BlockedOnHigherPriorityListener(sub, target));
                     return false;
                 }
             }
@@ -2273,52 +2279,6 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private static class BlockedOnHigherPriorityListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
-    {
-        private final QueueConsumer<?> _sub;
-        private final ConsumerTarget _target;
-
-        public BlockedOnHigherPriorityListener(final QueueConsumer<?> sub, final ConsumerTarget target)
-        {
-            _sub = sub;
-            _target = target;
-        }
-
-        @Override
-        public void stateChanged(final ConsumerTarget object,
-                                 final ConsumerTarget.State oldState,
-                                 final ConsumerTarget.State newState)
-        {
-            if(newState != ConsumerTarget.State.ACTIVE)
-            {
-                _sub.notifyWork();
-                _target.removeStateChangeListener(this);
-            }
-        }
-
-        @Override
-        public boolean equals(final Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-            final BlockedOnHigherPriorityListener that = (BlockedOnHigherPriorityListener) o;
-            return Objects.equals(_sub, that._sub) &&
-                   Objects.equals(_target, that._target);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(_sub, _target);
-        }
-    }
-
     abstract class BaseMessageContent implements Content, CustomRestHeaders
     {
         public static final int UNLIMITED = -1;

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Thu Nov 10 16:27:52 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
 import java.text.MessageFormat;
-import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -53,11 +52,9 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.security.access.Operation;
@@ -68,7 +65,6 @@ class QueueConsumerImpl
         implements QueueConsumer<QueueConsumerImpl>, LogSubject
 {
     private final static Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
-    private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _consumerNumber;
     private final long _createTime = System.currentTimeMillis();
@@ -85,28 +81,9 @@ class QueueConsumerImpl
     private final Object _sessionReference;
     private final AbstractQueue _queue;
 
-    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-
-    static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
-            new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
-
-    static
-    {
-        STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
-        STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
-        STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
-    }
-
     private final ConsumerTarget _target;
-    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _listener;
     private volatile QueueContext _queueContext;
-    private volatile StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
-    {
-        public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
-        {
-            // no-op
-        }
-    };
+
     @ManagedAttributeField
     private boolean _exclusive;
     @ManagedAttributeField
@@ -146,29 +123,6 @@ class QueueConsumerImpl
         open();
 
         setupLogging();
-
-        _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
-        {
-            @Override
-            public void stateChanged(final ConsumerTarget object,
-                                     final ConsumerTarget.State oldState,
-                                     final ConsumerTarget.State newState)
-            {
-                targetStateChanged(oldState, newState);
-            }
-        };
-        _target.addStateListener(_listener);
-
-        _suspendedConsumerLoggingTicker = target.isMultiQueue()
-                ? null
-                : new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
-                    {
-                        @Override
-                        protected void log(final long period)
-                        {
-                            getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
-                        }
-                    };
     }
 
     private static Map<String, Object> createAttributeMap(String name,
@@ -205,43 +159,6 @@ class QueueConsumerImpl
         return attributes;
     }
 
-    private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState)
-    {
-        if(oldState != newState)
-        {
-            if(newState == ConsumerTarget.State.CLOSED)
-            {
-                if(_targetClosed.compareAndSet(false,true))
-                {
-                    getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
-                }
-            }
-
-            if(_suspendedConsumerLoggingTicker != null)
-            {
-                if (newState == ConsumerTarget.State.SUSPENDED)
-                {
-                    _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-            }
-        }
-
-        if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get())
-        {
-            closeAsync();
-        }
-        final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
-        if(stateListener != null)
-        {
-            stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
-        }
-    }
-
     @Override
     public ConsumerTarget getTarget()
     {
@@ -301,6 +218,8 @@ class QueueConsumerImpl
     {
         if(_closed.compareAndSet(false,true))
         {
+            getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
+
             _waitingOnCreditMessageListener.remove();
 
             return doAfter(_target.consumerRemoved(this),
@@ -309,14 +228,8 @@ class QueueConsumerImpl
                                @Override
                                public void run()
                                {
-                                   _target.removeStateChangeListener(_listener);
-
                                    _queue.unregisterConsumer(QueueConsumerImpl.this);
 
-                                   if (_suspendedConsumerLoggingTicker != null)
-                                   {
-                                       getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
-                                   }
                                    deleted();
                                }
                            });
@@ -365,12 +278,6 @@ class QueueConsumerImpl
     }
 
     @Override
-    public State getState()
-    {
-        return STATE_MAP.get(_target.getState());
-    }
-
-    @Override
     public final Queue<?> getQueue()
     {
         return _queue;
@@ -412,16 +319,6 @@ class QueueConsumerImpl
         return _consumerNumber;
     }
 
-    public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
-    {
-        return _stateListener;
-    }
-
-    public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
-    {
-        _stateListener = listener;
-    }
-
     public final QueueContext getQueueContext()
     {
         return _queueContext;
@@ -434,12 +331,12 @@ class QueueConsumerImpl
 
     public final boolean isActive()
     {
-        return getState() == State.ACTIVE;
+        return _target.getState() == ConsumerTarget.State.OPEN;
     }
 
     public final boolean isClosed()
     {
-        return getState() == State.DELETED;
+        return _target.getState() == ConsumerTarget.State.CLOSED;
     }
 
     public final boolean hasInterest(QueueEntry entry)

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Thu Nov 10 16:27:52 2016
@@ -122,22 +122,6 @@ public abstract class AbstractSystemMess
         {
             _name = consumerName;
             _target = target;
-            _target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
-            {
-                @Override
-                public void stateChanged(final ConsumerTarget object,
-                                         final ConsumerTarget.State oldState,
-                                         final ConsumerTarget.State newState)
-                {
-                    if(newState == ConsumerTarget.State.ACTIVE)
-                    {
-                        if(!_queue.isEmpty())
-                        {
-                            _target.notifyWork();
-                        }
-                    }
-                }
-            });
         }
 
         @Override
@@ -263,7 +247,6 @@ public abstract class AbstractSystemMess
             return _name;
         }
 
-
         public void send(final InternalMessage response)
         {
             _queue.add(new PropertiesMessageInstance(this, response));

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Nov 10 16:27:52 2016
@@ -26,10 +26,7 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -51,44 +48,25 @@ import org.apache.qpid.server.protocol.C
 import org.apache.qpid.server.queue.AbstractQueue;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.transport.network.Ticker;
 
 public class TestConsumerTarget implements ConsumerTarget
 {
 
-    private final List<String> _messageIds;
     private boolean _closed = false;
     private String tag = "mocktag";
     private Queue<?> queue = null;
-    private StateChangeListener<ConsumerTarget, State> _listener = null;
-    private State _state = State.ACTIVE;
+    private State _state = State.OPEN;
     private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
-    private final Lock _stateChangeLock = new ReentrantLock();
 
     private boolean _isActive = true;
     private ConsumerImpl _consumer;
-    private boolean _messageSent;
     private MockSessionModel _sessionModel = new MockSessionModel();
     private boolean _notifyDesired;
 
-    public TestConsumerTarget()
-    {
-        _messageIds = null;
-    }
-
-    public TestConsumerTarget(List<String> messageIds)
-    {
-        _messageIds = messageIds;
-    }
-
     public boolean close()
     {
         _closed = true;
-        if (_listener != null)
-        {
-            _listener.stateChanged(this, _state, State.CLOSED);
-        }
         _state = State.CLOSED;
         updateNotifyWorkDesired();
         return true;
@@ -143,7 +121,6 @@ public class TestConsumerTarget implemen
 
     public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
     {
-        _messageSent = true;
         long size = entry.getMessage().getSize();
         if (_messages.contains(entry))
         {
@@ -194,39 +171,13 @@ public class TestConsumerTarget implemen
         return Futures.immediateFuture(null);
     }
 
-    @Override
-    public void notifyCurrentState()
-    {
-
-    }
-
     public void setState(State state)
     {
-        State oldState = _state;
         _state = state;
-        if(_listener != null)
-        {
-            _listener.stateChanged(this, oldState, state);
-        }
         updateNotifyWorkDesired();
     }
 
     @Override
-    public void addStateListener(final StateChangeListener<ConsumerTarget, State> listener)
-    {
-        _listener = listener;
-    }
-
-    @Override
-    public void removeStateChangeListener(final StateChangeListener<ConsumerTarget, State> listener)
-    {
-        if(_listener == listener)
-        {
-            _listener = null;
-        }
-    }
-
-    @Override
     public boolean processPending()
     {
         AbstractQueue.MessageContainer messageContainer = _consumer.pullMessage();
@@ -276,7 +227,7 @@ public class TestConsumerTarget implemen
     @Override
     public boolean isNotifyWorkDesired()
     {
-        return _state == State.ACTIVE;
+        return _state == State.OPEN;
     }
 
     @Override

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Nov 10 16:27:52 2016
@@ -65,74 +65,6 @@ public class StandardQueueTest extends A
                    getQueue().isDeleted());
     }
 
-    public void testActiveConsumerCount() throws Exception
-    {
-
-        Map<String,Object> queueAttributes = new HashMap<>();
-        queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
-        queueAttributes.put(Queue.OWNER, "testOwner");
-        final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
-        queue.open();
-        //verify adding an active consumer increases the count
-        final TestConsumerTarget consumer1 = new TestConsumerTarget();
-        consumer1.setActive(true);
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-        queue.addConsumer(consumer1,
-                          null,
-                          createMessage(-1l).getClass(),
-                          "test",
-                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify adding an inactive consumer doesn't increase the count
-        final TestConsumerTarget consumer2 = new TestConsumerTarget();
-        consumer2.setActive(false);
-        consumer2.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-        queue.addConsumer(consumer2,
-                          null,
-                          createMessage(-1l).getClass(),
-                          "test",
-                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify behaviour in face of expected state changes:
-
-        //verify a consumer going suspended->active increases the count
-        consumer2.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->suspended decreases the count
-        consumer2.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going suspended->closed doesn't change the count
-        consumer2.setState(ConsumerTarget.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->active doesn't change the count
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        consumer1.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going suspended->suspended doesn't change the count
-        consumer1.setState(ConsumerTarget.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-        consumer1.setState(ConsumerTarget.State.ACTIVE);
-        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
-
-        //verify a consumer going active->closed  decreases the count
-        consumer1.setState(ConsumerTarget.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
-
-    }
-
 
     /**
      * Tests that entry in dequeued state are not enqueued and not delivered to consumer

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Thu Nov 10 16:27:52 2016
@@ -114,7 +114,7 @@ public class ConsumerTarget_0_10 extends
                                Map<String, Object> arguments,
                                boolean multiQueue)
     {
-        super(State.SUSPENDED, multiQueue, session.getAMQPConnection());
+        super(multiQueue, session.getAMQPConnection());
         _session = session;
         _postIdSettingAction = new AddMessageDispositionListenerAction(session);
         _acceptMode = acceptMode;
@@ -496,12 +496,8 @@ public class ConsumerTarget_0_10 extends
 
     public boolean allocateCredit(ServerMessage message)
     {
-        boolean hasCredit = _creditManager.hasCredit();
         boolean creditAllocated = _creditManager.useCreditForMessage(message.getSize());
-        if(_creditManager.hasCredit() != hasCredit)
-        {
-            updateNotifyWorkDesired();
-        }
+        updateNotifyWorkDesired();
         return creditAllocated;
     }
 
@@ -512,12 +508,8 @@ public class ConsumerTarget_0_10 extends
 
     void restoreCredit(int count, long size)
     {
-        boolean hasCredit = _creditManager.hasCredit();
         _creditManager.restoreCredit(count, size);
-        if(_creditManager.hasCredit() != hasCredit)
-        {
-            updateNotifyWorkDesired();
-        }
+        updateNotifyWorkDesired();
     }
 
 
@@ -529,19 +521,13 @@ public class ConsumerTarget_0_10 extends
 
     public void stop()
     {
-        FlowCreditManager_0_10 creditManager = getCreditManager();
-        boolean hasCredit = creditManager.hasCredit();
-        creditManager.clearCredit();
-        if(hasCredit)
-        {
-            updateNotifyWorkDesired();
-        }
+        getCreditManager().clearCredit();
+        updateNotifyWorkDesired();
     }
 
     public void addCredit(MessageCreditUnit unit, long value)
     {
         FlowCreditManager_0_10 creditManager = getCreditManager();
-        boolean hasCredit = creditManager.hasCredit();
         switch (unit)
         {
             case MESSAGE:
@@ -552,17 +538,7 @@ public class ConsumerTarget_0_10 extends
                 creditManager.addCredit(0l, value);
                 break;
         }
-
-        boolean newHasCredit = creditManager.hasCredit();
-        if(newHasCredit)
-        {
-            updateState(State.SUSPENDED, State.ACTIVE);
-        }
-        if(hasCredit != newHasCredit)
-        {
-            updateNotifyWorkDesired();
-        }
-
+        updateNotifyWorkDesired();
     }
 
     public void setFlowMode(MessageFlowMode flowMode)

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Thu Nov 10 16:27:52 2016
@@ -298,7 +298,7 @@ public abstract class ConsumerTarget_0_8
                               RecordDeliveryMethod recordMethod,
                               boolean multiQueue)
     {
-        super(State.ACTIVE, multiQueue, channel.getAMQPConnection());
+        super(multiQueue, channel.getAMQPConnection());
 
         _channel = channel;
         _consumerTag = consumerTag;

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Nov 10 16:27:52 2016
@@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends Abstrac
     public ConsumerTarget_1_0(final SendingLink_1_0 link,
                               boolean acquires)
     {
-        super(State.SUSPENDED, false, link.getSession().getAMQPConnection());
+        super(false, link.getSession().getAMQPConnection());
         _link = link;
         _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
         _sectionEncoder = new SectionEncoderImpl(_typeRegistry);
@@ -276,10 +276,8 @@ class ConsumerTarget_1_0 extends Abstrac
         ProtocolEngine protocolEngine = getSession().getConnection();
         final boolean hasCredit =
                 _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
-        if (!hasCredit && getState() == State.ACTIVE)
-        {
-            suspend();
-        }
+
+        updateNotifyWorkDesired();
 
         if (hasCredit)
         {
@@ -291,25 +289,19 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
 
-    public void suspend()
-    {
-        updateState(State.ACTIVE, State.SUSPENDED);
-    }
-
-
     public void restoreCredit(final ServerMessage message)
     {
         final SendingLinkEndpoint endpoint = _link.getEndpoint();
         endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
+        updateNotifyWorkDesired();
     }
 
     public void queueEmpty()
     {
         if(_link.drained())
         {
-            updateState(State.ACTIVE, State.SUSPENDED);
+            updateNotifyWorkDesired();
         }
-
     }
 
     public void flowStateChanged()
@@ -319,7 +311,6 @@ class ConsumerTarget_1_0 extends Abstrac
         ProtocolEngine protocolEngine = getSession().getConnection();
         if (isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
         {
-            updateState(State.SUSPENDED, State.ACTIVE);
             _transactionId = _link.getTransactionId();
         }
     }

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Nov 10 16:27:52 2016
@@ -588,12 +588,6 @@ public class SendingLink_1_0 implements
 
     public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
     {
-
-        if(_consumer.isActive())
-        {
-            _target.suspend();
-        }
-
         _linkAttachment = linkAttachment;
 
         SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Thu Nov 10 16:27:52 2016
@@ -989,19 +989,6 @@ class ManagementNode implements MessageS
         final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
         target.consumerAdded(managementNodeConsumer);
         _consumers.add(managementNodeConsumer);
-        target.addStateListener(new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
-        {
-            @Override
-            public void stateChanged(final ConsumerTarget object,
-                                     final ConsumerTarget.State oldState,
-                                     final ConsumerTarget.State newState)
-            {
-                if(newState == ConsumerTarget.State.CLOSED)
-                {
-                    _consumers.remove(managementNodeConsumer);
-                }
-            }
-        });
         return managementNodeConsumer;
     }
 
@@ -1049,6 +1036,11 @@ class ManagementNode implements MessageS
         return MessageDurability.NEVER;
     }
 
+    void unregisterConsumer(ManagementNodeConsumer managementNodeConsumer)
+    {
+        _consumers.remove(managementNodeConsumer);
+    }
+
     private class ConsumedMessageInstance implements MessageInstance
     {
         private final ServerMessage _message;

Modified: qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1769148&r1=1769147&r2=1769148&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Thu Nov 10 16:27:52 2016
@@ -161,6 +161,7 @@ class ManagementNodeConsumer implements
     @Override
     public void close()
     {
+        _managementNode.unregisterConsumer(this);
     }
 
 




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org