You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 11:29:57 UTC
svn commit: r1564703 [4/4] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/logging/actors/
broker-core/src/main/java/org/apache/qpid/s...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java Wed Feb 5 10:29:55 2014
@@ -23,10 +23,10 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
public interface ClientDeliveryMethod
{
- void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props,
+ void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
final long deliveryTag) throws AMQException;
}
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Feb 5 10:29:55 2014
@@ -33,8 +33,8 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.Atomi
* Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
*/
-public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
@@ -70,23 +70,23 @@ public abstract class SubscriptionTarget
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
- private Subscription _subscription;
+ private Consumer _consumer;
- public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager) throws AMQException
{
- return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- static final class BrowserSubscription extends SubscriptionTarget_0_8
+ static final class BrowserConsumer extends ConsumerTarget_0_8
{
- public BrowserSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public BrowserConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag,
@@ -124,31 +124,31 @@ public abstract class SubscriptionTarget
}
- public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager) throws AMQException
{
- return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod) throws AMQException
{
- return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+ return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
}
- public static class NoAckSubscription extends SubscriptionTarget_0_8
+ public static class NoAckConsumer extends ConsumerTarget_0_8
{
private final AutoCommitTransaction _txn;
- public NoAckSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public NoAckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -221,13 +221,13 @@ public abstract class SubscriptionTarget
/**
* NoAck Subscription for use with BasicGet method.
*/
- public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription
+ public static final class GetNoAckConsumer extends NoAckConsumer
{
- public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
- AMQShortString consumerTag, FieldTable filters,
- boolean noLocal, FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+ AMQShortString consumerTag, FieldTable filters,
+ boolean noLocal, FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -241,32 +241,32 @@ public abstract class SubscriptionTarget
}
- public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager)
throws AMQException
{
- return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+ return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
- public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+ public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString consumerTag, FieldTable filters,
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
throws AMQException
{
- return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+ return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
}
- static final class AckSubscription extends SubscriptionTarget_0_8
+ static final class AckConsumer extends ConsumerTarget_0_8
{
- public AckSubscription(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public AckConsumer(AMQChannel channel,
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -305,7 +305,7 @@ public abstract class SubscriptionTarget
}
- private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class);
+ private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class);
private final AMQChannel _channel;
@@ -320,12 +320,12 @@ public abstract class SubscriptionTarget
- public SubscriptionTarget_0_8(AMQChannel channel,
- AMQShortString consumerTag,
- FieldTable arguments,
- FlowCreditManager creditManager,
- ClientDeliveryMethod deliveryMethod,
- RecordDeliveryMethod recordMethod)
+ public ConsumerTarget_0_8(AMQChannel channel,
+ AMQShortString consumerTag,
+ FieldTable arguments,
+ FlowCreditManager creditManager,
+ ClientDeliveryMethod deliveryMethod,
+ RecordDeliveryMethod recordMethod)
throws AMQException
{
super(State.ACTIVE);
@@ -357,20 +357,20 @@ public abstract class SubscriptionTarget
}
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
public AMQSessionModel getSessionModel()
@@ -417,7 +417,7 @@ public abstract class SubscriptionTarget
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -433,7 +433,7 @@ public abstract class SubscriptionTarget
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -488,14 +488,14 @@ public abstract class SubscriptionTarget
protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
throws AMQException
{
- _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag);
+ _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
}
protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
{
- _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
+ _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Wed Feb 5 10:29:55 2014
@@ -24,11 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -53,11 +49,11 @@ public class ExtractResendAndRequeue imp
{
message.setRedelivered();
- final Subscription subscription = message.getDeliveredSubscription();
- if (subscription != null)
+ final Consumer consumer = message.getDeliveredConsumer();
+ if (consumer != null)
{
// Consumer exists
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
_msgToResend.put(deliveryTag, message);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java Wed Feb 5 10:29:55 2014
@@ -21,9 +21,9 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
public interface RecordDeliveryMethod
{
- void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag);
+ void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb 5 10:29:55 2014
@@ -156,14 +156,14 @@ public class BasicConsumeMethodHandler i
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (AMQQueue.ExistingExclusiveConsumer e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Wed Feb 5 10:29:55 2014
@@ -38,14 +38,13 @@ import org.apache.qpid.server.flow.Messa
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.EnumSet;
@@ -133,7 +132,7 @@ public class BasicGetMethodHandler imple
{
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message, final
+ public void deliverToClient(final Consumer sub, final ServerMessage message, final
InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -150,30 +149,30 @@ public class BasicGetMethodHandler imple
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
- SubscriptionTarget_0_8 target;
- EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES);
if(acks)
{
- target = SubscriptionTarget_0_8.createAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- target = SubscriptionTarget_0_8.createNoAckTarget(channel,
- AMQShortString.EMPTY_STRING, null,
- singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
+ Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
sub.flush();
sub.close();
return(!singleMessageCredit.hasCredit());
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Feb 5 10:29:55 2014
@@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.Limit
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,8 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private SubscriptionTarget_0_8 _subscriptionTarget;
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -180,10 +179,13 @@ public class AckTest extends QpidTestCas
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -207,16 +209,16 @@ public class AckTest extends QpidTestCas
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget,
- null,
- AMQMessage.class,
- DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -233,12 +235,12 @@ public class AckTest extends QpidTestCas
{
// false arg means no acks expected
- _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -256,13 +258,13 @@ public class AckTest extends QpidTestCas
public void testSingleAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -293,13 +295,13 @@ public class AckTest extends QpidTestCas
public void testMultiAckReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -327,13 +329,13 @@ public class AckTest extends QpidTestCas
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
- DEFAULT_CONSUMER_TAG,
- null,
- new LimitlessCreditManager());
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
@@ -364,15 +366,15 @@ public class AckTest extends QpidTestCas
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 1;
publishMessages(msgCount);
- _subscription.externalStateChange();
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Wed Feb 5 10:29:55 2014
@@ -27,9 +27,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -65,7 +63,7 @@ public class ExtractResendAndRequeueTest
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue;
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
- private Subscription _subscription;
+ private Consumer _consumer;
private boolean _queueDeleted;
@Override
@@ -76,8 +74,8 @@ public class ExtractResendAndRequeueTest
_queue = mock(AMQQueue.class);
when(_queue.getName()).thenReturn(getName());
when(_queue.isDeleted()).thenReturn(_queueDeleted);
- _subscription = mock(Subscription.class);
- when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+ _consumer = mock(Consumer.class);
+ when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
long id = 0;
@@ -123,7 +121,7 @@ public class ExtractResendAndRequeueTest
// Acquire messages in subscription
for(QueueEntry entry : messageList)
{
- when(entry.getDeliveredSubscription()).thenReturn(_subscription);
+ when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}
}
@@ -168,7 +166,7 @@ public class ExtractResendAndRequeueTest
acquireMessages(_referenceList);
// Close subscription
- when(_subscription.isClosed()).thenReturn(true);
+ when(_consumer.isClosed()).thenReturn(true);
final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Wed Feb 5 10:29:55 2014
@@ -49,7 +49,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -245,7 +245,7 @@ public class InternalTestProtocolSession
@Override
- public void deliverToClient(Subscription sub, ServerMessage message,
+ public void deliverToClient(Consumer sub, ServerMessage message,
InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb 5 10:29:55 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -131,7 +131,7 @@ public class QueueBrowserUsesNoAckTest e
// indicate we are using the prefetch credit. i.e. using acks not No-Ack
assertTrue("The subscription has been suspended",
!getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
+ .equals(Consumer.State.SUSPENDED));
}
private void checkStoreContents(int messageCount)
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Feb 5 10:29:55 2014
@@ -43,15 +43,14 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import java.nio.ByteBuffer;
import java.util.List;
-class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
+class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
private SendingLink_1_0 _link;
@@ -61,10 +60,10 @@ class SubscriptionTarget_1_0 extends Abs
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
private final SectionEncoder _sectionEncoder;
- private Subscription _subscription;
+ private Consumer _consumer;
- public SubscriptionTarget_1_0(final SendingLink_1_0 link,
- boolean acquires)
+ public ConsumerTarget_1_0(final SendingLink_1_0 link,
+ boolean acquires)
{
super(State.SUSPENDED);
_link = link;
@@ -73,9 +72,9 @@ class SubscriptionTarget_1_0 extends Abs
_acquires = acquires;
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
private SendingLinkEndpoint getEndpoint()
@@ -94,7 +93,7 @@ class SubscriptionTarget_1_0 extends Abs
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -109,7 +108,7 @@ class SubscriptionTarget_1_0 extends Abs
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -255,7 +254,7 @@ class SubscriptionTarget_1_0 extends Abs
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getSubscription()))
+ if(queueEntry.isAcquiredBy(getConsumer()))
{
queueEntry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -385,7 +384,7 @@ class SubscriptionTarget_1_0 extends Abs
public void postCommit()
{
- if(_queueEntry.isAcquiredBy(getSubscription()))
+ if(_queueEntry.isAcquiredBy(getConsumer()))
{
_queueEntry.delete();
}
@@ -499,13 +498,13 @@ class SubscriptionTarget_1_0 extends Abs
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb 5 10:29:55 2014
@@ -68,8 +68,7 @@ import org.apache.qpid.server.filter.Sim
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -82,8 +81,8 @@ public class SendingLink_1_0 implements
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription _subscription;
- private SubscriptionTarget_1_0 _target;
+ private Consumer _consumer;
+ private ConsumerTarget_1_0 _target;
private boolean _draining;
private final Map<Binary, MessageInstance> _unsettledMap =
@@ -112,7 +111,7 @@ public class SendingLink_1_0 implements
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
boolean noLocal = false;
@@ -175,11 +174,11 @@ public class SendingLink_1_0 implements
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
if(source.getDistributionMode() != StdDistMode.COPY)
{
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
}
@@ -376,9 +375,9 @@ public class SendingLink_1_0 implements
}
- _target = new SubscriptionTarget_1_0(this, true);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ _target = new ConsumerTarget_1_0(this, true);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
else
@@ -390,18 +389,18 @@ public class SendingLink_1_0 implements
{
if(noLocal)
{
- options.add(Subscription.Option.NO_LOCAL);
+ options.add(Consumer.Option.NO_LOCAL);
}
- _subscription.setNoLocal(noLocal);
+ _consumer.setNoLocal(noLocal);
try
{
- _subscription = _queue.registerSubscription(_target,
- messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class, getEndpoint().getName(), options);
+ _consumer = _queue.addConsumer(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
@@ -428,7 +427,7 @@ public class SendingLink_1_0 implements
try
{
- _subscription.close();
+ _consumer.close();
}
catch (AMQException e)
@@ -622,7 +621,7 @@ public class SendingLink_1_0 implements
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
- if(_subscription.isActive())
+ if(_consumer.isActive())
{
_target.suspend();
}
@@ -653,7 +652,7 @@ public class SendingLink_1_0 implements
if(outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -673,7 +672,7 @@ public class SendingLink_1_0 implements
else if(outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java Wed Feb 5 10:29:55 2014
@@ -20,7 +20,6 @@ package org.apache.qpid.server.managemen
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -327,8 +326,8 @@ public class MessageServlet extends Abst
: entry.isAcquired()
? "Acquired"
: "");
- final Subscription deliveredSubscription = entry.getDeliveredSubscription();
- object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID());
+ final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+ object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId());
ServerMessage message = entry.getMessage();
if(message != null)
Copied: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java Wed Feb 5 10:29:55 2014
@@ -46,7 +46,7 @@ import java.util.List;
* SUB-1002 : Close
* SUB-1003 : State : <state>
*/
-public class SubscriptionLoggingTest extends AbstractTestLogging
+public class ConsumerLoggingTest extends AbstractTestLogging
{
static final String SUB_PREFIX = "SUB-";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org