You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/05/19 01:05:36 UTC
svn commit: r945945 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/test/client/destination/
Author: rajith
Date: Tue May 18 23:05:36 2010
New Revision: 945945
URL: http://svn.apache.org/viewvc?rev=945945&view=rev
Log:
Implemented the feature described in QPID-2515
However a few issues needs to be ironed out - see the JIRA for these issues.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue May 18 23:05:36 2010
@@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQ
* USed to store the range of in tx messages
*/
private RangeSet _txRangeSet = new RangeSet();
- private int _txSize = 0;
+ private int _txSize = 0;
//--- constructors
/**
@@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQ
throws AMQException, FailoverException
{
boolean preAcquire;
+
+ long capacity = getCapacity(consumer.getDestination());
+
try
{
preAcquire = ( ! consumer.isNoConsume() &&
@@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQ
String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
- if (! prefetch())
+ if (capacity == 0)
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
}
@@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQ
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
- if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch))
+ if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
+ capacity,
Option.UNRELIABLE);
}
@@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQ
getCurrentException();
}
}
+
+ private long getCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getDestSyntax() == DestSyntax.ADDR &&
+ destination.getSourceLink().getCapacity() > 0)
+ {
+ capacity = destination.getSourceLink().getCapacity();
+ }
+ else if (prefetch())
+ {
+ capacity = getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
/**
* Create an 0_10 message producer
@@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQ
//only set if msg list is null
try
{
- if (! prefetch())
+ long capacity = getCapacity(consumer.getDestination());
+
+ if (capacity == 0)
{
if (consumer.getMessageListener() != null)
{
@@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQ
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
+ capacity,
Option.UNRELIABLE);
}
getQpidSession()
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue May 18 23:05:36 2010
@@ -19,6 +19,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
@@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 e
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
-
+
+ private long capacity = 0;
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 e
}
}
_isStarted = connection.started();
+
+ // Destination setting overrides connection defaults
+ if (destination.getDestSyntax() == DestSyntax.ADDR &&
+ destination.getSourceLink().getCapacity() > 0)
+ {
+ capacity = destination.getSourceLink().getCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+
}
@@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 e
}
if (messageOk)
{
- if (isMessageListenerSet() && ! getSession().prefetch())
+ if (isMessageListenerSet() && capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 e
}
// if we are syncrhonously waiting for a message
// and messages are not prefetched we then need to request another one
- if(! getSession().prefetch())
+ if(capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 e
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && ! getSession().prefetch())
+ if (messageListener != null && capacity == 0)
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 e
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (! getSession().prefetch())
+ if (capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
MessageCreditUnit.MESSAGE, 1,
@@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 e
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
{
+
_0_10session.getQpidSession().messageFlush
(getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
_0_10session.getQpidSession().sync();
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (getSession().prefetch())
+
+ if (capacity > 0)
{
_0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE,
- _0_10session.getAMQConnection().getMaxPrefetch(),
- Option.UNRELIABLE);
+ (getConsumerTagString(),
+ MessageCreditUnit.MESSAGE,
+ capacity,
+ Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (! getSession().prefetch())
+ if (capacity == 0)
{
_syncReceive.set(false);
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue May 18 23:05:36 2010
@@ -21,8 +21,12 @@ package org.apache.qpid.test.client.dest
*/
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTe
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import edu.emory.mathcs.backport.java.util.Collections;
-
public class AddressBasedDestinationTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
@@ -48,7 +50,7 @@ public class AddressBasedDestinationTest
public void setUp() throws Exception
{
super.setUp();
- _connection = getConnection();
+ _connection = getConnection() ;
_connection.start();
}
@@ -211,6 +213,7 @@ public class AddressBasedDestinationTest
"}, " +
"x-bindings: [{exchange : 'amq.direct', key : test}, " +
"{exchange : 'amq.fanout'}," +
+ "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
"{exchange : 'amq.topic', key : 'a.#'}" +
"]," +
@@ -236,7 +239,15 @@ public class AddressBasedDestinationTest
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getAddressName(),"a.#", null));
+ dest.getAddressName(),"a.#", null));
+
+ Map<String,Object> args = new HashMap<String,Object>();
+ args.put("x-match","any");
+ args.put("dep","sales");
+ args.put("loc","CA");
+ assertTrue("Queue not bound as expected",(
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ dest.getAddressName(),null, args));
}
@@ -273,8 +284,7 @@ public class AddressBasedDestinationTest
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
(AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
- dest.getQueueName(),"hello", Collections.emptyMap()));
-
+ dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
}
public void testBindQueueWithArgs() throws Exception
@@ -331,6 +341,53 @@ public class AddressBasedDestinationTest
dest.getAddressName(),null, a.getOptions()));
}
+ /**
+ * Test goal: Verifies the capacity property in address string is handled properly.
+ * Test strategy:
+ * Creates a destination with capacity 10.
+ * Creates consumer with client ack.
+ * Sends 15 messages to the queue, tries to receive 10.
+ * Tries to receive the 11th message and checks if its null.
+ *
+ * Since capacity is 10 and we haven't acked any messages,
+ * we should not have received the 11th.
+ *
+ * Acks the 10th message and verifies we receive the rest of the msgs.
+ */
+ public void testLinkCapacity() throws Exception
+ {
+ if (!isCppBroker())
+ {
+ _logger.info("Not C++ broker, exiting test");
+ return;
+ }
+
+ Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+
+ String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}";
+ AMQDestination dest = new AMQAnyDestination(addr);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+
+ for (int i=0; i< 15; i++)
+ {
+ prod.send(jmsSession.createTextMessage("msg" + i) );
+ }
+
+ for (int i=0; i< 9; i++)
+ {
+ cons.receive();
+ }
+ Message msg = cons.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Should have received the 10th message",msg);
+ assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
+ msg.acknowledge();
+ for (int i=11; i<16; i++)
+ {
+ assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
+ }
+ }
+
/*public void testBindQueueForXMLExchange() throws Exception
{
if (!isCppBroker())
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org
Re: svn commit: r945945 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/destination/
Posted by Martin Ritchie <ri...@apache.org>.
Hi Rajith is the testLinkCapacity testing some special CPP broker functionality?
+ if (!isCppBroker())
+ {
+ _logger.info("Not C++ broker, exiting test");
+ return;
+ }
If it is not for the Java broker it should be excluded in the
JavaExcludes file. I know that our current exclusion method is not the
best. Solutions on a postcard ;)
However, it appears to be testing 0-10 functionality so should it not
also run against the Java broker with 0-10?
Cheers
Martin
On 19 May 2010 00:05, <ra...@apache.org> wrote:
> Author: rajith
> Date: Tue May 18 23:05:36 2010
> New Revision: 945945
>
> URL: http://svn.apache.org/viewvc?rev=945945&view=rev
> Log:
> Implemented the feature described in QPID-2515
> However a few issues needs to be ironed out - see the JIRA for these issues.
>
> Modified:
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue May 18 23:05:36 2010
> @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQ
> * USed to store the range of in tx messages
> */
> private RangeSet _txRangeSet = new RangeSet();
> - private int _txSize = 0;
> + private int _txSize = 0;
> //--- constructors
>
> /**
> @@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQ
> throws AMQException, FailoverException
> {
> boolean preAcquire;
> +
> + long capacity = getCapacity(consumer.getDestination());
> +
> try
> {
> preAcquire = ( ! consumer.isNoConsume() &&
> @@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQ
>
> String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
>
> - if (! prefetch())
> + if (capacity == 0)
> {
> getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
> }
> @@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQ
> getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
> Option.UNRELIABLE);
>
> - if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch))
> + if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
> {
> // set the flow
> getQpidSession().messageFlow(consumerTag,
> MessageCreditUnit.MESSAGE,
> - getAMQConnection().getMaxPrefetch(),
> + capacity,
> Option.UNRELIABLE);
> }
>
> @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQ
> getCurrentException();
> }
> }
> +
> + private long getCapacity(AMQDestination destination)
> + {
> + long capacity = 0;
> + if (destination.getDestSyntax() == DestSyntax.ADDR &&
> + destination.getSourceLink().getCapacity() > 0)
> + {
> + capacity = destination.getSourceLink().getCapacity();
> + }
> + else if (prefetch())
> + {
> + capacity = getAMQConnection().getMaxPrefetch();
> + }
> + return capacity;
> + }
>
> /**
> * Create an 0_10 message producer
> @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQ
> //only set if msg list is null
> try
> {
> - if (! prefetch())
> + long capacity = getCapacity(consumer.getDestination());
> +
> + if (capacity == 0)
> {
> if (consumer.getMessageListener() != null)
> {
> @@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQ
> {
> getQpidSession()
> .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
> - getAMQConnection().getMaxPrefetch(),
> + capacity,
> Option.UNRELIABLE);
> }
> getQpidSession()
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue May 18 23:05:36 2010
> @@ -19,6 +19,7 @@ package org.apache.qpid.client;
>
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> +import org.apache.qpid.client.AMQDestination.DestSyntax;
> import org.apache.qpid.client.message.*;
> import org.apache.qpid.client.protocol.AMQProtocolHandler;
> import org.apache.qpid.framing.FieldTable;
> @@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 e
> */
> private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
> private String _consumerTagString;
> -
> +
> + private long capacity = 0;
> +
> //--- constructor
> protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
> String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
> @@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 e
> }
> }
> _isStarted = connection.started();
> +
> + // Destination setting overrides connection defaults
> + if (destination.getDestSyntax() == DestSyntax.ADDR &&
> + destination.getSourceLink().getCapacity() > 0)
> + {
> + capacity = destination.getSourceLink().getCapacity();
> + }
> + else if (getSession().prefetch())
> + {
> + capacity = _0_10session.getAMQConnection().getMaxPrefetch();
> + }
> +
> }
>
>
> @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 e
> }
> if (messageOk)
> {
> - if (isMessageListenerSet() && ! getSession().prefetch())
> + if (isMessageListenerSet() && capacity == 0)
> {
> _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
> MessageCreditUnit.MESSAGE, 1,
> @@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 e
> }
> // if we are syncrhonously waiting for a message
> // and messages are not prefetched we then need to request another one
> - if(! getSession().prefetch())
> + if(capacity == 0)
> {
> _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
> MessageCreditUnit.MESSAGE, 1,
> @@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 e
> public void setMessageListener(final MessageListener messageListener) throws JMSException
> {
> super.setMessageListener(messageListener);
> - if (messageListener != null && ! getSession().prefetch())
> + if (messageListener != null && capacity == 0)
> {
> _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
> MessageCreditUnit.MESSAGE, 1,
> @@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 e
> */
> public Object getMessageFromQueue(long l) throws InterruptedException
> {
> - if (! getSession().prefetch())
> + if (capacity == 0)
> {
> _syncReceive.set(true);
> }
> - if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
> + if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
> {
> _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
> MessageCreditUnit.MESSAGE, 1,
> @@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 e
> Object o = super.getMessageFromQueue(l);
> if (o == null && _0_10session.isStarted())
> {
> +
> _0_10session.getQpidSession().messageFlush
> (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
> _0_10session.getQpidSession().sync();
> _0_10session.getQpidSession().messageFlow
> (getConsumerTagString(), MessageCreditUnit.BYTE,
> 0xFFFFFFFF, Option.UNRELIABLE);
> - if (getSession().prefetch())
> +
> + if (capacity > 0)
> {
> _0_10session.getQpidSession().messageFlow
> - (getConsumerTagString(), MessageCreditUnit.MESSAGE,
> - _0_10session.getAMQConnection().getMaxPrefetch(),
> - Option.UNRELIABLE);
> + (getConsumerTagString(),
> + MessageCreditUnit.MESSAGE,
> + capacity,
> + Option.UNRELIABLE);
> }
> _0_10session.syncDispatchQueue();
> o = super.getMessageFromQueue(-1);
> }
> - if (! getSession().prefetch())
> + if (capacity == 0)
> {
> _syncReceive.set(false);
> }
>
> Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
> +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue May 18 23:05:36 2010
> @@ -21,8 +21,12 @@ package org.apache.qpid.test.client.dest
> */
>
>
> +import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.Map;
> import javax.jms.Connection;
> import javax.jms.JMSException;
> +import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> @@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTe
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> -import edu.emory.mathcs.backport.java.util.Collections;
> -
> public class AddressBasedDestinationTest extends QpidTestCase
> {
> private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
> @@ -48,7 +50,7 @@ public class AddressBasedDestinationTest
> public void setUp() throws Exception
> {
> super.setUp();
> - _connection = getConnection();
> + _connection = getConnection() ;
> _connection.start();
> }
>
> @@ -211,6 +213,7 @@ public class AddressBasedDestinationTest
> "}, " +
> "x-bindings: [{exchange : 'amq.direct', key : test}, " +
> "{exchange : 'amq.fanout'}," +
> + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
> "{exchange : 'amq.topic', key : 'a.#'}" +
> "]," +
>
> @@ -236,7 +239,15 @@ public class AddressBasedDestinationTest
>
> assertTrue("Queue not bound as expected",(
> (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
> - dest.getAddressName(),"a.#", null));
> + dest.getAddressName(),"a.#", null));
> +
> + Map<String,Object> args = new HashMap<String,Object>();
> + args.put("x-match","any");
> + args.put("dep","sales");
> + args.put("loc","CA");
> + assertTrue("Queue not bound as expected",(
> + (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
> + dest.getAddressName(),null, args));
>
> }
>
> @@ -273,8 +284,7 @@ public class AddressBasedDestinationTest
> // The existence of the queue is implicitly tested here
> assertTrue("Queue not bound as expected",(
> (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
> - dest.getQueueName(),"hello", Collections.emptyMap()));
> -
> + dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
> }
>
> public void testBindQueueWithArgs() throws Exception
> @@ -331,6 +341,53 @@ public class AddressBasedDestinationTest
> dest.getAddressName(),null, a.getOptions()));
> }
>
> + /**
> + * Test goal: Verifies the capacity property in address string is handled properly.
> + * Test strategy:
> + * Creates a destination with capacity 10.
> + * Creates consumer with client ack.
> + * Sends 15 messages to the queue, tries to receive 10.
> + * Tries to receive the 11th message and checks if its null.
> + *
> + * Since capacity is 10 and we haven't acked any messages,
> + * we should not have received the 11th.
> + *
> + * Acks the 10th message and verifies we receive the rest of the msgs.
> + */
> + public void testLinkCapacity() throws Exception
> + {
> + if (!isCppBroker())
> + {
> + _logger.info("Not C++ broker, exiting test");
> + return;
> + }
> +
> + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
> +
> + String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}";
> + AMQDestination dest = new AMQAnyDestination(addr);
> + MessageConsumer cons = jmsSession.createConsumer(dest);
> + MessageProducer prod = jmsSession.createProducer(dest);
> +
> + for (int i=0; i< 15; i++)
> + {
> + prod.send(jmsSession.createTextMessage("msg" + i) );
> + }
> +
> + for (int i=0; i< 9; i++)
> + {
> + cons.receive();
> + }
> + Message msg = cons.receive(RECEIVE_TIMEOUT);
> + assertNotNull("Should have received the 10th message",msg);
> + assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
> + msg.acknowledge();
> + for (int i=11; i<16; i++)
> + {
> + assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
> + }
> + }
> +
> /*public void testBindQueueForXMLExchange() throws Exception
> {
> if (!isCppBroker())
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project: http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>
--
Martin Ritchie
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org