You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/04 00:28:13 UTC
svn commit: r1564130 [2/2] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/main/java/org/apache/qpid/server/subscription/
broker-core/src/test/java/org/apache/qpid/server...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb 3 23:28:12 2014
@@ -24,6 +24,7 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
@@ -33,8 +34,10 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -42,9 +45,10 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.EnumSet;
+
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -151,17 +155,24 @@ public class BasicGetMethodHandler imple
}
};
- Subscription sub;
+ SubscriptionTarget_0_8 target;
+ EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
+ Subscription.Option.SEES_REQUEUES);
if(acks)
{
- sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+ target = SubscriptionTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = SubscriptionTarget_0_8.createNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- queue.registerSubscription(sub,false);
+ Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
sub.flush();
queue.unregisterSubscription(sub);
return(!singleMessageCredit.hasCredit());
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Mon Feb 3 23:28:12 2014
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Set;
/**
@@ -47,6 +48,7 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
+ private SubscriptionTarget_0_8 _subscriptionTarget;
private Subscription _subscription;
private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
private void publishMessages(int count, boolean persistent) throws AMQException
{
- _queue.registerSubscription(_subscription,false);
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -178,7 +179,10 @@ public class AckTest extends QpidTestCas
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -202,7 +206,16 @@ public class AckTest extends QpidTestCas
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +231,13 @@ public class AckTest extends QpidTestCas
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -235,7 +254,15 @@ public class AckTest extends QpidTestCas
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -264,7 +291,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -290,7 +325,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -319,8 +362,12 @@ public class AckTest extends QpidTestCas
// Send 10 messages
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+ _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Subscription.Option.SEES_REQUEUES,
+ Subscription.Option.ACQUIRES));
+
final int msgCount = 1;
publishMessages(msgCount);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Mon Feb 3 23:28:12 2014
@@ -37,6 +37,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
@@ -105,9 +108,11 @@ public class ExtractResendAndRequeueTest
*/
private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
{
- Subscription subscription = new MockSubscription();
+ Subscription subscription = mock(Subscription.class);
+ when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+ when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
- // Aquire messages in subscription
+ // Acquire messages in subscription
for (QueueEntry entry : messageList)
{
entry.acquire(subscription);
@@ -157,7 +162,7 @@ public class ExtractResendAndRequeueTest
Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
// Close subscription
- subscription.close();
+ when(subscription.isClosed()).thenReturn(true);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Feb 3 23:28:12 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,6 +111,7 @@ public class SendingLink_1_0 implements
QueueDestination qd = null;
AMQQueue queue = null;
+ EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
boolean noLocal = false;
@@ -173,14 +175,11 @@ public class SendingLink_1_0 implements
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
_target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
- _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class,
- source.getDistributionMode() != StdDistMode.COPY,
- source.getDistributionMode() != StdDistMode.COPY,
- getEndpoint().getName(),
- false,
- _target);
- _target.setSubscription(_subscription);
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
+ }
}
else if(destination instanceof ExchangeDestination)
@@ -373,26 +372,27 @@ public class SendingLink_1_0 implements
_target = new SubscriptionTarget_1_0(this, true);
- _subscription = new DelegatingSubscription<SubscriptionTarget_1_0>(messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class,
- true,
- true,
- getEndpoint().getName(),
- false,
- _target);
- _target.setSubscription(_subscription);
+ options.add(Subscription.Option.ACQUIRES);
+ options.add(Subscription.Option.SEES_REQUEUES);
}
- if(_subscription != null)
+ if(_target != null)
{
+ if(noLocal)
+ {
+ options.add(Subscription.Option.NO_LOCAL);
+ }
+
+
_subscription.setNoLocal(noLocal);
try
{
-
- queue.registerSubscription(_subscription, false);
+ _subscription = queue.registerSubscription(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, getEndpoint().getName(), options);
}
catch (AMQException e)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java Mon Feb 3 23:28:12 2014
@@ -74,11 +74,6 @@ class SubscriptionTarget_1_0 extends Abs
_acquires = acquires;
}
- public void setSubscription(Subscription sub)
- {
- _subscription = sub;
- }
-
public Subscription getSubscription()
{
return _subscription;
@@ -505,6 +500,18 @@ class SubscriptionTarget_1_0 extends Abs
}
@Override
+ public void subscriptionRegistered(final Subscription sub)
+ {
+ _subscription = sub;
+ }
+
+ @Override
+ public void subscriptionRemoved(final Subscription sub)
+ {
+
+ }
+
+ @Override
public long getUnacknowledgedBytes()
{
// TODO
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org