You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/21 09:36:29 UTC
qpid-broker-j git commit: QPID-7812: Sessions#getConsumerCount
returns number of consumers on all protocols
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 85abb468e -> 92b79cc92
QPID-7812: Sessions#getConsumerCount returns number of consumers on all protocols
Also removed dead consumer collection from session implementations
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/92b79cc9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/92b79cc9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/92b79cc9
Branch: refs/heads/master
Commit: 92b79cc9255bf70400c09fc3389feb066265c5bb
Parents: 85abb46
Author: Keith Wall <kw...@apache.org>
Authored: Tue Jun 20 17:05:23 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jun 20 17:05:29 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/server/model/Session.java | 2 -
.../apache/qpid/server/queue/AbstractQueue.java | 33 ++++++++---
.../apache/qpid/server/session/AMQPSession.java | 10 ++--
.../server/session/AbstractAMQPSession.java | 24 +++++++-
.../server/protocol/v0_10/ServerSession.java | 40 +++++--------
.../protocol/v0_10/ServerSessionDelegate.java | 13 ++---
.../server/protocol/v0_10/Session_0_10.java | 14 -----
.../qpid/server/protocol/v0_8/AMQChannel.java | 59 +++++++-------------
.../qpid/server/protocol/v1_0/Session_1_0.java | 18 +-----
9 files changed, 90 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index dbc87be..271cf84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -56,8 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
boolean isProducerFlowBlocked();
- Collection<? extends Consumer> getConsumers();
-
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.COUNT, label = "Consumers")
long getConsumerCount();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2a9d3b5..1ab48e2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -859,17 +859,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
Object exclusiveOwner = _exclusiveOwner;
+ final AMQPSession<?, T> session = target.getSession();
switch(_exclusive)
{
case CONNECTION:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSession().getAMQPConnection();
- addExclusivityConstraint(target.getSession().getAMQPConnection());
+ exclusiveOwner = session.getAMQPConnection();
+ addExclusivityConstraint(session.getAMQPConnection());
}
else
{
- if(exclusiveOwner != target.getSession().getAMQPConnection())
+ if(exclusiveOwner != session.getAMQPConnection())
{
throw new ConsumerAccessRefused();
}
@@ -878,12 +879,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
case SESSION:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSession();
- addExclusivityConstraint(target.getSession());
+ exclusiveOwner = session;
+ addExclusivityConstraint(session);
}
else
{
- if(exclusiveOwner != target.getSession())
+ if(exclusiveOwner != session)
{
throw new ConsumerAccessRefused();
}
@@ -896,7 +897,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
break;
case PRINCIPAL:
- Principal currentAuthorizedPrincipal = target.getSession().getAMQPConnection().getAuthorizedPrincipal();
+ Principal currentAuthorizedPrincipal = session.getAMQPConnection().getAuthorizedPrincipal();
if(exclusiveOwner == null)
{
exclusiveOwner = currentAuthorizedPrincipal;
@@ -912,11 +913,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
case CONTAINER:
if(exclusiveOwner == null)
{
- exclusiveOwner = target.getSession().getAMQPConnection().getRemoteContainerName();
+ exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
}
else
{
- if(!exclusiveOwner.equals(target.getSession().getAMQPConnection().getRemoteContainerName()))
+ if(!exclusiveOwner.equals(session.getAMQPConnection().getRemoteContainerName()))
{
throw new ConsumerAccessRefused();
}
@@ -1016,6 +1017,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
+ session.incConsumerCount();
+ addChangeListener(new AbstractConfigurationChangeListener()
+ {
+ @Override
+ public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
+ {
+ if (child.equals(consumer))
+ {
+ session.decConsumerCount();
+ removeChangeListener(this);
+ }
+ }
+ });
+
return consumer;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 1cbf643..697a504 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -20,20 +20,16 @@
*/
package org.apache.qpid.server.session;
-import java.util.Collection;
-import java.util.UUID;
-
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.transport.network.Ticker;
+import org.apache.qpid.server.util.Deletable;
public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSession<S, X>,
X extends ConsumerTarget<X>> extends Session<S>,
@@ -70,7 +66,9 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio
long getConsumerCount();
- Collection<Consumer<?,X>> getConsumers();
+ void incConsumerCount();
+
+ void decConsumerCount();
/**
* Return the time the current transaction started.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index fa9f22a..09aefcc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -29,11 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.auth.Subject;
import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -60,10 +60,9 @@ import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.TransactionTimeoutTicker;
+import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.FutureHelper;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
-import org.apache.qpid.server.transport.network.Ticker;
public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
X extends ConsumerTarget<X>>
@@ -84,6 +83,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
protected final LogSubject _logSubject;
protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
+ private final AtomicInteger _consumerCount = new AtomicInteger();
protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
private Iterator<AbstractConsumerTarget> _processPendingIterator;
@@ -406,6 +406,24 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
}
}
+ @Override
+ public final long getConsumerCount()
+ {
+ return _consumerCount.get();
+ }
+
+ @Override
+ public final void incConsumerCount()
+ {
+ _consumerCount.incrementAndGet();
+ }
+
+ @Override
+ public final void decConsumerCount()
+ {
+ _consumerCount.decrementAndGet();
+ }
+
protected abstract void updateBlockedStateIfNecessary();
public abstract boolean isClosing();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index cd27cbc..7da0d0f 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -56,7 +56,6 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,16 +78,25 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_10.transport.*;
-import org.apache.qpid.server.protocol.v0_10.transport.Xid;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.protocol.v0_10.transport.Frame;
-import org.apache.qpid.server.txn.*;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.DistributedTransaction;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -152,8 +160,6 @@ public class ServerSession extends SessionInvoker
private final AtomicLong _txnCount = new AtomicLong(0);
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>();
-
private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>();
public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry)
@@ -1202,15 +1208,6 @@ public class ServerSession extends SessionInvoker
}
- public void register(final MessageInstanceConsumer<ConsumerTarget_0_10> messageInstanceConsumer)
- {
- if(messageInstanceConsumer instanceof Consumer<?,?>)
- {
- final Consumer<?,ConsumerTarget_0_10> consumer = (Consumer<?,ConsumerTarget_0_10>) messageInstanceConsumer;
- _consumers.add(consumer);
- }
- }
-
public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
@@ -1727,17 +1724,6 @@ public class ServerSession extends SessionInvoker
}
}
- public long getConsumerCount()
- {
- return _subscriptions.values().size();
- }
-
- public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
- {
-
- return Collections.unmodifiableCollection(_consumers);
- }
-
public void setModelObject(final Session_0_10 session)
{
_modelObject = session;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1c85614..8c3f8c9 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -354,13 +354,12 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
}
for(MessageSource source : sources)
{
- ((ServerSession) session).register(
- source.addConsumer(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options,
- priority));
+ source.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options,
+ priority);
}
target.updateNotifyWorkDesired();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 649d290..45a6500 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -21,14 +21,12 @@
package org.apache.qpid.server.protocol.v0_10;
import java.security.AccessControlContext;
-import java.util.Collection;
import java.util.List;
import javax.security.auth.Subject;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.session.AbstractAMQPSession;
@@ -109,18 +107,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg
}
@Override
- public Collection<Consumer<?, ConsumerTarget_0_10>> getConsumers()
- {
- return _serverSession.getConsumers();
- }
-
- @Override
- public long getConsumerCount()
- {
- return _serverSession.getConsumerCount();
- }
-
- @Override
public long getTxnRejects()
{
return _serverSession.getTxnRejects();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 937e1d0..dd562b9 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -38,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -53,12 +52,11 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
-import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -76,7 +74,19 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.UnacknowledgedMessageMap.Visitor;
import org.apache.qpid.server.protocol.v0_8.transport.*;
@@ -178,7 +188,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private final ClientDeliveryMethod _clientDeliveryMethod;
private final ImmediateAction _immediateAction = new ImmediateAction();
- private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_8>> _consumers = new CopyOnWriteArrayList<>();
private Session<?> _modelObject;
private long _blockTime;
private long _blockingTimeout;
@@ -737,17 +746,11 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
for(MessageSource source : sources)
{
- MessageInstanceConsumer<ConsumerTarget_0_8> sub =
- source.addConsumer(target,
- filterManager,
- AMQMessage.class,
- AMQShortString.toString(tag),
- options, priority);
- if (sub instanceof Consumer<?, ?>)
- {
- final Consumer<?,ConsumerTarget_0_8> modelConsumer = (Consumer<?,ConsumerTarget_0_8>) sub;
- _consumers.add(modelConsumer);
- }
+ source.addConsumer(target,
+ filterManager,
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options, priority);
}
target.updateNotifyWorkDesired();
}
@@ -777,16 +780,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Collection<MessageInstanceConsumer> subs = target == null ? null : target.getConsumers();
- if (subs != null)
+ if (target != null)
{
- for(MessageInstanceConsumer sub : subs)
- {
- if (sub instanceof Consumer<?,?>)
- {
- _consumers.remove(sub);
- }
- }
target.close();
return true;
}
@@ -1739,18 +1734,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
@Override
- public long getConsumerCount()
- {
- return _tag2SubscriptionTargetMap.size();
- }
-
- @Override
- public Collection<Consumer<?,ConsumerTarget_0_8>> getConsumers()
- {
- return Collections.unmodifiableCollection(_consumers);
- }
-
- @Override
public long getTransactionStartTimeLong()
{
ServerTransaction serverTransaction = _transaction;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/92b79cc9/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 15788ef..29f1a01 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,7 +41,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
@@ -54,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.filter.SelectorParsingException;
@@ -66,14 +66,12 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -132,8 +130,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private final AMQPConnection_1_0<?> _connection;
private AtomicBoolean _closed = new AtomicBoolean();
- private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_1_0>> _consumers = new CopyOnWriteArrayList<>();
-
private Session<?> _modelObject = this;
private SessionState _sessionState;
@@ -1421,12 +1417,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
@Override
- public long getConsumerCount()
- {
- return _consumers.size();
- }
-
- @Override
public String toLogString()
{
final AMQPConnection<?> amqpConnection = getAMQPConnection();
@@ -1474,12 +1464,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
@Override
- public Collection<Consumer<?, ConsumerTarget_1_0>> getConsumers()
- {
- return Collections.unmodifiableCollection(_consumers);
- }
-
- @Override
public long getTransactionStartTimeLong()
{
return 0L;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org