You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/21 09:36:29 UTC

qpid-broker-j git commit: QPID-7812: Sessions#getConsumerCount returns number of consumers on all protocols

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 85abb468e -> 92b79cc92


QPID-7812: Sessions#getConsumerCount returns number of consumers on all protocols

Also removed dead consumer collection from session implementations


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/92b79cc9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/92b79cc9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/92b79cc9

Branch: refs/heads/master
Commit: 92b79cc9255bf70400c09fc3389feb066265c5bb
Parents: 85abb46
Author: Keith Wall <kw...@apache.org>
Authored: Tue Jun 20 17:05:23 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jun 20 17:05:29 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/server/model/Session.java   |  2 -
 .../apache/qpid/server/queue/AbstractQueue.java | 33 ++++++++---
 .../apache/qpid/server/session/AMQPSession.java | 10 ++--
 .../server/session/AbstractAMQPSession.java     | 24 +++++++-
 .../server/protocol/v0_10/ServerSession.java    | 40 +++++--------
 .../protocol/v0_10/ServerSessionDelegate.java   | 13 ++---
 .../server/protocol/v0_10/Session_0_10.java     | 14 -----
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 59 +++++++-------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 18 +-----
 9 files changed, 90 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index dbc87be..271cf84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -56,8 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
     boolean isProducerFlowBlocked();
 
 
-    Collection<? extends Consumer> getConsumers();
-
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
     long getConsumerCount();
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2a9d3b5..1ab48e2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -859,17 +859,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
 
         Object exclusiveOwner = _exclusiveOwner;
+        final AMQPSession<?, T> session = target.getSession();
         switch(_exclusive)
         {
             case CONNECTION:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSession().getAMQPConnection();
-                    addExclusivityConstraint(target.getSession().getAMQPConnection());
+                    exclusiveOwner = session.getAMQPConnection();
+                    addExclusivityConstraint(session.getAMQPConnection());
                 }
                 else
                 {
-                    if(exclusiveOwner != target.getSession().getAMQPConnection())
+                    if(exclusiveOwner != session.getAMQPConnection())
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -878,12 +879,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             case SESSION:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSession();
-                    addExclusivityConstraint(target.getSession());
+                    exclusiveOwner = session;
+                    addExclusivityConstraint(session);
                 }
                 else
                 {
-                    if(exclusiveOwner != target.getSession())
+                    if(exclusiveOwner != session)
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -896,7 +897,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                 }
                 break;
             case PRINCIPAL:
-                Principal currentAuthorizedPrincipal = target.getSession().getAMQPConnection().getAuthorizedPrincipal();
+                Principal currentAuthorizedPrincipal = session.getAMQPConnection().getAuthorizedPrincipal();
                 if(exclusiveOwner == null)
                 {
                     exclusiveOwner = currentAuthorizedPrincipal;
@@ -912,11 +913,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             case CONTAINER:
                 if(exclusiveOwner == null)
                 {
-                    exclusiveOwner = target.getSession().getAMQPConnection().getRemoteContainerName();
+                    exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
                 }
                 else
                 {
-                    if(!exclusiveOwner.equals(target.getSession().getAMQPConnection().getRemoteContainerName()))
+                    if(!exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName()))
                     {
                         throw new ConsumerAccessRefused();
                     }
@@ -1016,6 +1017,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
 
+        session.incConsumerCount();
+        addChangeListener(new AbstractConfigurationChangeListener()
+        {
+            @Override
+            public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+            {
+                if (child.equals(consumer))
+                {
+                    session.decConsumerCount();
+                    removeChangeListener(this);
+                }
+            }
+        });
+
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 1cbf643..697a504 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -20,20 +20,16 @@
  */
 package org.apache.qpid.server.session;
 
-import java.util.Collection;
-import java.util.UUID;
-
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.transport.network.Ticker;
+import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSession<S, X>,
                              X extends ConsumerTarget<X>> extends Session<S>,
@@ -70,7 +66,9 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
 
     long getConsumerCount();
 
-    Collection<Consumer<?,X>> getConsumers();
+    void incConsumerCount();
+
+    void decConsumerCount();
 
     /**
      * Return the time the current transaction started.

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index fa9f22a..09aefcc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -29,11 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.Subject;
 
 import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -60,10 +60,9 @@ import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.TransactionTimeoutTicker;
+import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.FutureHelper;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-import org.apache.qpid.server.transport.network.Ticker;
 
 public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
                                           X extends ConsumerTarget<X>>
@@ -84,6 +83,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
     protected final LogSubject _logSubject;
 
     protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
+    private final AtomicInteger _consumerCount = new AtomicInteger();
 
     protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
     private Iterator<AbstractConsumerTarget> _processPendingIterator;
@@ -406,6 +406,24 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
         }
     }
 
+    @Override
+    public final long getConsumerCount()
+    {
+        return _consumerCount.get();
+    }
+
+    @Override
+    public final void incConsumerCount()
+    {
+        _consumerCount.incrementAndGet();
+    }
+
+    @Override
+    public final void decConsumerCount()
+    {
+        _consumerCount.decrementAndGet();
+    }
+
     protected abstract void updateBlockedStateIfNecessary();
 
     public abstract boolean isClosing();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index cd27cbc..7da0d0f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,7 +56,6 @@ import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,16 +78,25 @@ import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v0_10.transport.*;
-import org.apache.qpid.server.protocol.v0_10.transport.Xid;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.protocol.v0_10.transport.Frame;
-import org.apache.qpid.server.txn.*;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.DistributedTransaction;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
@@ -152,8 +160,6 @@ public class ServerSession extends SessionInvoker
     private final AtomicLong _txnCount = new AtomicLong(0);
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
-    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
-
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
 
     public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
@@ -1202,15 +1208,6 @@ public class ServerSession extends SessionInvoker
     }
 
 
-    public void register(final MessageInstanceConsumer<ConsumerTarget_0_10> messageInstanceConsumer)
-    {
-        if(messageInstanceConsumer instanceof Consumer<?,?>)
-        {
-            final Consumer<?,ConsumerTarget_0_10> consumer = (Consumer<?,ConsumerTarget_0_10>) messageInstanceConsumer;
-            _consumers.add(consumer);
-        }
-    }
-
     public ConsumerTarget_0_10 getSubscription(String destination)
     {
         return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
@@ -1727,17 +1724,6 @@ public class ServerSession extends SessionInvoker
         }
     }
 
-    public long getConsumerCount()
-    {
-        return _subscriptions.values().size();
-    }
-
-    public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
-    {
-
-        return Collections.unmodifiableCollection(_consumers);
-    }
-
     public void setModelObject(final Session_0_10 session)
     {
         _modelObject = session;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1c85614..8c3f8c9 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -354,13 +354,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
                         }
                         for(MessageSource source : sources)
                         {
-                            ((ServerSession) session).register(
-                                    source.addConsumer(target,
-                                                       filterManager,
-                                                       MessageTransferMessage.class,
-                                                       destination,
-                                                       options,
-                                                       priority));
+                            source.addConsumer(target,
+                                               filterManager,
+                                               MessageTransferMessage.class,
+                                               destination,
+                                               options,
+                                               priority);
                         }
                         target.updateNotifyWorkDesired();
                     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 649d290..45a6500 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -21,14 +21,12 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import java.security.AccessControlContext;
-import java.util.Collection;
 import java.util.List;
 
 import javax.security.auth.Subject;
 
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.session.AbstractAMQPSession;
@@ -109,18 +107,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
     }
 
     @Override
-    public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
-    {
-        return _serverSession.getConsumers();
-    }
-
-    @Override
-    public long getConsumerCount()
-    {
-        return _serverSession.getConsumerCount();
-    }
-
-    @Override
     public long getTxnRejects()
     {
         return _serverSession.getTxnRejects();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 937e1d0..dd562b9 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -38,7 +38,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,12 +52,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -76,7 +74,19 @@ import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.RoutingResult;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.ProtocolVersion;
 import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
 import org.apache.qpid.server.protocol.v0_8.transport.*;
@@ -178,7 +188,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     private final ClientDeliveryMethod _clientDeliveryMethod;
 
     private final ImmediateAction _immediateAction = new ImmediateAction();
-    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
     private Session<?> _modelObject;
     private long _blockTime;
     private long _blockingTimeout;
@@ -737,17 +746,11 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
             for(MessageSource source : sources)
             {
-                MessageInstanceConsumer<ConsumerTarget_0_8> sub =
-                        source.addConsumer(target,
-                                           filterManager,
-                                           AMQMessage.class,
-                                           AMQShortString.toString(tag),
-                                           options, priority);
-                if (sub instanceof Consumer<?, ?>)
-                {
-                    final Consumer<?,ConsumerTarget_0_8> modelConsumer = (Consumer<?,ConsumerTarget_0_8>) sub;
-                    _consumers.add(modelConsumer);
-                }
+                source.addConsumer(target,
+                                   filterManager,
+                                   AMQMessage.class,
+                                   AMQShortString.toString(tag),
+                                   options, priority);
             }
             target.updateNotifyWorkDesired();
         }
@@ -777,16 +780,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         }
 
         ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
-        Collection<MessageInstanceConsumer> subs = target == null ? null : target.getConsumers();
-        if (subs != null)
+        if (target != null)
         {
-            for(MessageInstanceConsumer sub : subs)
-            {
-                if (sub instanceof Consumer<?,?>)
-                {
-                    _consumers.remove(sub);
-                }
-            }
             target.close();
             return true;
         }
@@ -1739,18 +1734,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
     }
 
     @Override
-    public long getConsumerCount()
-    {
-        return _tag2SubscriptionTargetMap.size();
-    }
-
-    @Override
-    public Collection<Consumer<?,ConsumerTarget_0_8>> getConsumers()
-    {
-        return Collections.unmodifiableCollection(_consumers);
-    }
-
-    @Override
     public long getTransactionStartTimeLong()
     {
         ServerTransaction serverTransaction = _transaction;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 15788ef..29f1a01 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,7 +41,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.Subject;
@@ -54,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.filter.SelectorParsingException;
@@ -66,14 +66,12 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -132,8 +130,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     private final AMQPConnection_1_0<?> _connection;
     private AtomicBoolean _closed = new AtomicBoolean();
 
-    private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
-
     private Session<?> _modelObject = this;
 
     private SessionState _sessionState;
@@ -1421,12 +1417,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     @Override
-    public long getConsumerCount()
-    {
-        return _consumers.size();
-    }
-
-    @Override
     public String toLogString()
     {
         final AMQPConnection<?> amqpConnection = getAMQPConnection();
@@ -1474,12 +1464,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     @Override
-    public Collection<Consumer<?, ConsumerTarget_1_0>> getConsumers()
-    {
-        return Collections.unmodifiableCollection(_consumers);
-    }
-
-    @Override
     public long getTransactionStartTimeLong()
     {
         return 0L;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org