You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 10:17:56 UTC
svn commit: r1772514 [2/2] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/filter/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/or...
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Sun Dec 4 10:17:55 2016
@@ -19,19 +19,17 @@
package org.apache.qpid.server.virtualhost;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.EnumSet;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -53,12 +51,12 @@ public class VirtualHostPropertiesNodeTe
public void testAddConsumer() throws Exception
{
- final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+ final EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
- ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
- final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+ MessageInstanceConsumer consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+ final MessageContainer messageContainer = consumer.pullMessage();
assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Dec 4 10:17:55 2016
@@ -31,12 +31,12 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
@@ -186,7 +186,7 @@ public class ConsumerTarget_0_10 extends
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -409,7 +409,7 @@ public class ConsumerTarget_0_10 extends
});
}
- void reject(final ConsumerImpl consumer, final MessageInstance entry)
+ void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
{
entry.setRedelivered();
if (entry.makeAcquisitionUnstealable(consumer))
@@ -418,7 +418,7 @@ public class ConsumerTarget_0_10 extends
}
}
- void release(final ConsumerImpl consumer,
+ void release(final MessageInstanceConsumer consumer,
final MessageInstance entry,
final boolean setRedelivered)
{
@@ -442,7 +442,7 @@ public class ConsumerTarget_0_10 extends
}
}
- protected void sendToDLQOrDiscard(final ConsumerImpl consumer, MessageInstance entry)
+ protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
{
final ServerMessage msg = entry.getMessage();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Sun Dec 4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -34,11 +34,11 @@ class ExplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
private final ConsumerTarget_0_10 _target;
- private final ConsumerImpl _consumer;
+ private final MessageInstanceConsumer _consumer;
public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
ConsumerTarget_0_10 target,
- final ConsumerImpl consumer)
+ final MessageInstanceConsumer consumer)
{
_entry = entry;
_target = target;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Sun Dec 4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
@@ -33,11 +33,11 @@ class ImplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
private final ConsumerTarget_0_10 _target;
- private final ConsumerImpl _consumer;
+ private final MessageInstanceConsumer _consumer;
public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
ConsumerTarget_0_10 target,
- final ConsumerImpl consumer)
+ final MessageInstanceConsumer consumer)
{
_entry = entry;
_target = target;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Sun Dec 4 10:17:55 2016
@@ -21,8 +21,8 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
@@ -30,12 +30,12 @@ public class MessageAcceptCompletionList
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
- private final ConsumerImpl _consumer;
+ private final MessageInstanceConsumer _consumer;
private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
- final ConsumerImpl consumer,
+ final MessageInstanceConsumer consumer,
ServerSession session,
MessageInstance entry,
boolean restoreCredit)
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec 4 10:17:55 2016
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.LogMessage;
@@ -65,6 +64,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
@@ -541,7 +541,7 @@ public class ServerSession extends Sessi
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final ConsumerImpl consumer,
+ public void acknowledge(final MessageInstanceConsumer consumer,
final ConsumerTarget_0_10 target,
final MessageInstance entry)
{
@@ -578,11 +578,11 @@ public class ServerSession extends Sessi
}
- public void register(final ConsumerImpl consumerImpl)
+ public void register(final MessageInstanceConsumer messageInstanceConsumer)
{
- if(consumerImpl instanceof Consumer<?>)
+ if(messageInstanceConsumer instanceof Consumer<?>)
{
- final Consumer<?> consumer = (Consumer<?>) consumerImpl;
+ final Consumer<?> consumer = (Consumer<?>) messageInstanceConsumer;
_consumers.add(consumer);
consumer.addChangeListener(_consumerClosedListener);
consumerAdded(consumer);
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Dec 4 10:17:55 2016
@@ -39,7 +39,7 @@ import org.apache.qpid.bytebuffer.QpidBy
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.ErrorCodes;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
@@ -337,18 +337,18 @@ public class ServerSessionDelegate exten
((ServerSession)session).register(destination, target);
try
{
- EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+ EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
- options.add(ConsumerImpl.Option.ACQUIRES);
+ options.add(ConsumerOption.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- options.add(ConsumerImpl.Option.SEES_REQUEUES);
+ options.add(ConsumerOption.SEES_REQUEUES);
}
if(method.getExclusive())
{
- options.add(ConsumerImpl.Option.EXCLUSIVE);
+ options.add(ConsumerOption.EXCLUSIVE);
}
for(MessageSource source : sources)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec 4 10:17:55 2016
@@ -59,7 +59,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
@@ -78,6 +78,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
@@ -306,8 +307,8 @@ public class AMQChannel
new GetDeliveryMethod(queue);
ConsumerTarget_0_8 target;
- EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
- ConsumerImpl.Option.SEES_REQUEUES);
+ EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES,
+ ConsumerOption.SEES_REQUEUES);
if (acks)
{
@@ -322,7 +323,7 @@ public class AMQChannel
INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
}
- ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+ MessageInstanceConsumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
target.updateNotifyWorkDesired();
target.sendNextMessage();
target.close();
@@ -710,7 +711,7 @@ public class AMQChannel
}
ConsumerTarget_0_8 target;
- EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+ EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
final boolean multiQueue = sources.size()>1;
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
@@ -720,20 +721,20 @@ public class AMQChannel
else if(acks)
{
target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
- options.add(ConsumerImpl.Option.ACQUIRES);
- options.add(ConsumerImpl.Option.SEES_REQUEUES);
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
}
else
{
target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
- options.add(ConsumerImpl.Option.ACQUIRES);
- options.add(ConsumerImpl.Option.SEES_REQUEUES);
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
}
if(exclusive)
{
- options.add(ConsumerImpl.Option.EXCLUSIVE);
+ options.add(ConsumerOption.EXCLUSIVE);
}
@@ -817,7 +818,7 @@ public class AMQChannel
for(MessageSource source : sources)
{
- ConsumerImpl sub =
+ MessageInstanceConsumer sub =
source.addConsumer(target,
filterManager,
AMQMessage.class,
@@ -859,10 +860,10 @@ public class AMQChannel
}
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+ Collection<MessageInstanceConsumer> subs = target == null ? null : target.getConsumers();
if (subs != null)
{
- for(ConsumerImpl sub : subs)
+ for(MessageInstanceConsumer sub : subs)
{
if (sub instanceof Consumer<?>)
{
@@ -955,7 +956,7 @@ public class AMQChannel
*/
public void addUnacknowledgedMessage(MessageInstance entry,
long deliveryTag,
- ConsumerImpl consumer,
+ MessageInstanceConsumer consumer,
final boolean usesCredit)
{
if (_logger.isDebugEnabled())
@@ -1010,7 +1011,7 @@ public class AMQChannel
for (Map.Entry<Long, MessageConsumerAssociation> entry : copy.entrySet())
{
MessageInstance unacked = entry.getValue().getMessageInstance();
- ConsumerImpl consumer = entry.getValue().getConsumer();
+ MessageInstanceConsumer consumer = entry.getValue().getConsumer();
// Mark message redelivered
unacked.setRedelivered();
// here we wish to restore credit
@@ -1117,7 +1118,7 @@ public class AMQChannel
{
long deliveryTag = entry.getKey();
MessageInstance message = entry.getValue().getMessageInstance();
- ConsumerImpl consumer = entry.getValue().getConsumer();
+ MessageInstanceConsumer consumer = entry.getValue().getConsumer();
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
@@ -1142,7 +1143,7 @@ public class AMQChannel
{
long deliveryTag = entry.getKey();
MessageInstance message = entry.getValue().getMessageInstance();
- ConsumerImpl consumer = entry.getValue().getConsumer();
+ MessageInstanceConsumer consumer = entry.getValue().getConsumer();
//Amend the delivery counter as the client hasn't seen these messages yet.
message.decrementDeliveryCount();
@@ -1187,7 +1188,7 @@ public class AMQChannel
// may need to deliver queued messages
for (ConsumerTarget_0_8 s : getConsumerTargets())
{
- for(ConsumerImpl sub : s.getConsumers())
+ for(MessageInstanceConsumer sub : s.getConsumers())
{
sub.externalStateChange();
}
@@ -1273,7 +1274,7 @@ public class AMQChannel
for(MessageConsumerAssociation association : _resendList)
{
final MessageInstance messageInstance = association.getMessageInstance();
- final ConsumerImpl consumer = association.getConsumer();
+ final MessageInstanceConsumer consumer = association.getConsumer();
if (consumer.isClosed())
{
messageInstance.release(consumer);
@@ -1298,7 +1299,7 @@ public class AMQChannel
_suspended.set(false);
for(ConsumerTarget_0_8 target : getConsumerTargets())
{
- for(ConsumerImpl sub : target.getConsumers())
+ for(MessageInstanceConsumer sub : target.getConsumers())
{
sub.externalStateChange();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sun Dec 4 10:17:55 2016
@@ -27,11 +27,11 @@ import org.apache.qpid.common.AMQPFilter
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -96,7 +96,7 @@ public abstract class ConsumerTarget_0_8
* @throws QpidException
*/
@Override
- public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -142,7 +142,7 @@ public abstract class ConsumerTarget_0_8
* @param batch
*/
@Override
- public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -245,7 +245,7 @@ public abstract class ConsumerTarget_0_8
* @param batch
*/
@Override
- public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
// put queue entry on a list and then notify the connection to read list.
@@ -397,7 +397,7 @@ public abstract class ConsumerTarget_0_8
updateNotifyWorkDesired();
}
- protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+ protected long sendToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
final InstanceProperties props,
final long deliveryTag)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java Sun Dec 4 10:17:55 2016
@@ -20,14 +20,14 @@
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
public interface MessageConsumerAssociation
{
MessageInstance getMessageInstance();
- ConsumerImpl getConsumer();
+ MessageInstanceConsumer getConsumer();
long getSize();
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Sun Dec 4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import java.util.Map;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
public interface UnacknowledgedMessageMap
@@ -38,7 +38,7 @@ public interface UnacknowledgedMessageMa
void visit(Visitor visitor);
- void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
+ void add(long deliveryTag, MessageInstance message, final MessageInstanceConsumer consumer, final boolean usesCredit);
MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Sun Dec 4 10:17:55 2016
@@ -27,8 +27,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -36,10 +36,10 @@ class UnacknowledgedMessageMapImpl imple
private static final class MessageConsumerAssociationImpl implements MessageConsumerAssociation
{
private final MessageInstance _messageInstance;
- private final ConsumerImpl _consumer;
+ private final MessageInstanceConsumer _consumer;
private final boolean _usesCredit;
- private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
+ private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final MessageInstanceConsumer consumer, final boolean usesCredit)
{
_messageInstance = messageInstance;
_consumer = consumer;
@@ -53,7 +53,7 @@ class UnacknowledgedMessageMapImpl imple
}
@Override
- public ConsumerImpl getConsumer()
+ public MessageInstanceConsumer getConsumer()
{
return _consumer;
}
@@ -81,6 +81,7 @@ class UnacknowledgedMessageMapImpl imple
_creditRestorer = creditRestorer;
}
+ @Override
public void collect(long deliveryTag, boolean multiple, Map<Long, MessageConsumerAssociation> msgs)
{
if (multiple)
@@ -106,6 +107,7 @@ class UnacknowledgedMessageMapImpl imple
}
}
+ @Override
public MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit)
{
MessageConsumerAssociationImpl entry = _map.remove(deliveryTag);
@@ -120,6 +122,7 @@ class UnacknowledgedMessageMapImpl imple
return entry;
}
+ @Override
public void visit(Visitor visitor)
{
for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
@@ -129,7 +132,8 @@ class UnacknowledgedMessageMapImpl imple
visitor.visitComplete();
}
- public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
+ @Override
+ public void add(long deliveryTag, MessageInstance message, final MessageInstanceConsumer consumer, final boolean usesCredit)
{
if(_map.put(deliveryTag, new MessageConsumerAssociationImpl(message, consumer, usesCredit)) == null)
{
@@ -141,17 +145,20 @@ class UnacknowledgedMessageMapImpl imple
}
}
+ @Override
public int size()
{
return _size;
}
+ @Override
public MessageInstance get(long key)
{
MessageConsumerAssociation association = _map.get(key);
return association == null ? null : association.getMessageInstance();
}
+ @Override
public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple)
{
if(multiple)
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Sun Dec 4 10:17:55 2016
@@ -28,8 +28,8 @@ import java.util.Collection;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -45,7 +45,7 @@ public class UnacknowledgedMessageMapTes
return input.getMessageInstance();
}
};
- private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
+ private final MessageInstanceConsumer _consumer = mock(MessageInstanceConsumer.class);
public void testDeletedMessagesCantBeAcknowledged()
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sun Dec 4 10:17:55 2016
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -100,7 +100,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
- public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
{
// TODO
ServerMessage serverMessage = entry.getMessage();
@@ -330,16 +330,16 @@ class ConsumerTarget_1_0 extends Abstrac
private final MessageInstance _queueEntry;
private final Binary _deliveryTag;
- private final ConsumerImpl _consumer;
+ private final MessageInstanceConsumer _consumer;
- public DispositionAction(Binary tag, MessageInstance queueEntry, final ConsumerImpl consumer)
+ public DispositionAction(Binary tag, MessageInstance queueEntry, final MessageInstanceConsumer consumer)
{
_deliveryTag = tag;
_queueEntry = queueEntry;
_consumer = consumer;
}
- public ConsumerImpl getConsumer()
+ public MessageInstanceConsumer getConsumer()
{
return _consumer;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sun Dec 4 10:17:55 2016
@@ -39,10 +39,11 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Binding;
@@ -84,7 +85,7 @@ public class SendingLink_1_0 implements
private NamedAddressSpace _addressSpace;
private SendingDestination _destination;
- private ConsumerImpl _consumer;
+ private MessageInstanceConsumer _consumer;
private ConsumerTarget_1_0 _target;
private boolean _draining;
@@ -113,7 +114,7 @@ public class SendingLink_1_0 implements
_durability = source.getDurable();
QueueDestination qd = null;
- EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+ EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
boolean noLocal = false;
@@ -169,8 +170,8 @@ public class SendingLink_1_0 implements
_target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
if(source.getDistributionMode() != StdDistMode.COPY)
{
- options.add(ConsumerImpl.Option.ACQUIRES);
- options.add(ConsumerImpl.Option.SEES_REQUEUES);
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
}
}
@@ -329,8 +330,8 @@ public class SendingLink_1_0 implements
_target = new ConsumerTarget_1_0(this, true);
- options.add(ConsumerImpl.Option.ACQUIRES);
- options.add(ConsumerImpl.Option.SEES_REQUEUES);
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
}
else
@@ -342,13 +343,13 @@ public class SendingLink_1_0 implements
{
if(noLocal)
{
- options.add(ConsumerImpl.Option.NO_LOCAL);
+ options.add(ConsumerOption.NO_LOCAL);
}
if(_durability == TerminusDurability.CONFIGURATION ||
_durability == TerminusDurability.UNSETTLED_STATE )
{
- options.add(ConsumerImpl.Option.DURABLE);
+ options.add(ConsumerOption.DURABLE);
}
try
@@ -692,7 +693,7 @@ public class SendingLink_1_0 implements
return _addressSpace;
}
- public ConsumerImpl getConsumer()
+ public MessageInstanceConsumer getConsumer()
{
return _consumer;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Dec 4 10:17:55 2016
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
@@ -58,6 +57,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -1084,7 +1084,7 @@ public class Session_1_0 implements AMQS
private void registerConsumer(final SendingLink_1_0 link)
{
- ConsumerImpl consumer = link.getConsumer();
+ MessageInstanceConsumer consumer = link.getConsumer();
if(consumer instanceof Consumer<?>)
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec 4 10:17:55 2016
@@ -50,7 +50,7 @@ import javax.security.auth.Subject;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
@@ -58,6 +58,7 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -1434,7 +1435,7 @@ class ManagementNode implements MessageS
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<ConsumerImpl.Option> options,
+ final EnumSet<ConsumerOption> options,
final Integer priority)
{
@@ -1543,7 +1544,7 @@ class ManagementNode implements MessageS
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public MessageInstanceConsumer getAcquiringConsumer()
{
return null;
}
@@ -1555,13 +1556,13 @@ class ManagementNode implements MessageS
}
@Override
- public boolean isAcquiredBy(final ConsumerImpl consumer)
+ public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
{
return false;
}
@Override
- public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -1579,19 +1580,13 @@ class ManagementNode implements MessageS
}
@Override
- public ConsumerImpl getDeliveredConsumer()
- {
- return null;
- }
-
- @Override
public void reject()
{
}
@Override
- public boolean isRejectedBy(final ConsumerImpl consumer)
+ public boolean isRejectedBy(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -1609,13 +1604,13 @@ class ManagementNode implements MessageS
}
@Override
- public boolean acquire(final ConsumerImpl sub)
+ public boolean acquire(final MessageInstanceConsumer sub)
{
return false;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -1671,7 +1666,7 @@ class ManagementNode implements MessageS
}
@Override
- public void release(final ConsumerImpl release)
+ public void release(final MessageInstanceConsumer release)
{
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec 4 10:17:55 2016
@@ -26,29 +26,28 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-class ManagementNodeConsumer implements ConsumerImpl, MessageDestination
+class ManagementNodeConsumer implements MessageInstanceConsumer, MessageDestination
{
- private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
private final ManagementNode _managementNode;
private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
private final ConsumerTarget _target;
private final String _name;
+ private final Object _identifier = new Object();
public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
@@ -68,7 +67,13 @@ class ManagementNodeConsumer implements
}
@Override
- public AbstractQueue.MessageContainer pullMessage()
+ public Object getIdentifier()
+ {
+ return _identifier;
+ }
+
+ @Override
+ public MessageContainer pullMessage()
{
if (!_queue.isEmpty())
{
@@ -77,7 +82,7 @@ class ManagementNodeConsumer implements
if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- return new AbstractQueue.MessageContainer(managementResponse, null, false);
+ return new MessageContainer(managementResponse, null, false);
}
}
return null;
@@ -92,55 +97,12 @@ class ManagementNodeConsumer implements
}
}
- @Override
- public long getBytesOut()
- {
- return 0;
- }
-
- @Override
- public long getMessagesOut()
- {
- return 0;
- }
-
- @Override
- public long getUnacknowledgedBytes()
- {
- return 0;
- }
-
- @Override
- public long getUnacknowledgedMessages()
- {
- return 0;
- }
-
- @Override
- public AMQSessionModel getSessionModel()
+ AMQSessionModel getSessionModel()
{
return _target.getSessionModel();
}
@Override
- public MessageSource getMessageSource()
- {
- return _managementNode;
- }
-
- @Override
- public long getConsumerNumber()
- {
- return _id;
- }
-
- @Override
- public boolean isSuspended()
- {
- return false;
- }
-
- @Override
public boolean isClosed()
{
return false;
@@ -153,24 +115,11 @@ class ManagementNodeConsumer implements
}
@Override
- public boolean seesRequeues()
- {
- return false;
- }
-
- @Override
public void close()
{
_managementNode.unregisterConsumer(this);
}
-
- @Override
- public boolean isActive()
- {
- return false;
- }
-
@Override
public NamedAddressSpace getAddressSpace()
{
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Sun Dec 4 10:17:55 2016
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -85,7 +85,7 @@ class ManagementResponse implements Mess
}
@Override
- public ConsumerImpl getAcquiringConsumer()
+ public ManagementNodeConsumer getAcquiringConsumer()
{
return _consumer;
}
@@ -97,13 +97,13 @@ class ManagementResponse implements Mess
}
@Override
- public boolean isAcquiredBy(final ConsumerImpl consumer)
+ public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
{
return consumer == _consumer && !isDeleted();
}
@Override
- public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
{
return consumer == _consumer;
}
@@ -121,19 +121,13 @@ class ManagementResponse implements Mess
}
@Override
- public ManagementNodeConsumer getDeliveredConsumer()
- {
- return isDeleted() ? null : _consumer;
- }
-
- @Override
public void reject()
{
delete();
}
@Override
- public boolean isRejectedBy(final ConsumerImpl consumer)
+ public boolean isRejectedBy(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -151,13 +145,13 @@ class ManagementResponse implements Mess
}
@Override
- public boolean acquire(final ConsumerImpl sub)
+ public boolean acquire(final MessageInstanceConsumer sub)
{
return false;
}
@Override
- public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+ public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return false;
}
@@ -213,7 +207,7 @@ class ManagementResponse implements Mess
}
@Override
- public void release(final ConsumerImpl release)
+ public void release(final MessageInstanceConsumer release)
{
release();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org