You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/05/19 01:05:36 UTC

svn commit: r945945 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/destination/

Author: rajith
Date: Tue May 18 23:05:36 2010
New Revision: 945945

URL: http://svn.apache.org/viewvc?rev=945945&view=rev
Log:
Implemented the feature described in QPID-2515
However a few issues needs to be ironed out - see the JIRA for these issues.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue May 18 23:05:36 2010
@@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQ
      * USed to store the range of in tx messages
      */
     private RangeSet _txRangeSet = new RangeSet();
-    private int _txSize = 0;
+    private int _txSize = 0;    
     //--- constructors
 
     /**
@@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQ
             throws AMQException, FailoverException
     {
         boolean preAcquire;
+        
+        long capacity = getCapacity(consumer.getDestination());
+        
         try
         {
             preAcquire = ( ! consumer.isNoConsume()  &&
@@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQ
 
         String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
 
-        if (! prefetch())
+        if (capacity == 0)
         {
             getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
         }
@@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQ
         getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
                                      Option.UNRELIABLE);
                 
-        if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch))
+        if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
         {
             // set the flow
             getQpidSession().messageFlow(consumerTag,
                                          MessageCreditUnit.MESSAGE,
-                                         getAMQConnection().getMaxPrefetch(),
+                                         capacity,
                                          Option.UNRELIABLE);
         }
 
@@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQ
             getCurrentException();
         }
     }
+    
+    private long getCapacity(AMQDestination destination)
+    {
+        long capacity = 0;
+        if (destination.getDestSyntax() == DestSyntax.ADDR && 
+                destination.getSourceLink().getCapacity() > 0)
+        {
+            capacity = destination.getSourceLink().getCapacity();
+        }
+        else if (prefetch())
+        {
+            capacity = getAMQConnection().getMaxPrefetch();
+        }
+        return capacity;
+    }
 
     /**
      * Create an 0_10 message producer
@@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQ
                 //only set if msg list is null
                 try
                 {
-                    if (! prefetch())
+                    long capacity = getCapacity(consumer.getDestination());
+                    
+                    if (capacity == 0)
                     {
                         if (consumer.getMessageListener() != null)
                         {
@@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQ
                     {
                         getQpidSession()
                             .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
-                                         getAMQConnection().getMaxPrefetch(),
+                                         capacity,
                                          Option.UNRELIABLE);
                     }
                     getQpidSession()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue May 18 23:05:36 2010
@@ -19,6 +19,7 @@ package org.apache.qpid.client;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
@@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 e
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
     private String _consumerTagString;
-
+    
+    private long capacity = 0;
+        
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 e
             }
         }
         _isStarted = connection.started();
+        
+        // Destination setting overrides connection defaults
+        if (destination.getDestSyntax() == DestSyntax.ADDR && 
+                destination.getSourceLink().getCapacity() > 0)
+        {
+            capacity = destination.getSourceLink().getCapacity();
+        }
+        else if (getSession().prefetch())
+        {
+            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+        }
+        
     }
 
 
@@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 e
         }
         if (messageOk)
         {
-            if (isMessageListenerSet() && ! getSession().prefetch())
+            if (isMessageListenerSet() && capacity == 0)
             {
                 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                           MessageCreditUnit.MESSAGE, 1,
@@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 e
             }
             // if we are syncrhonously waiting for a message
             // and messages are not prefetched we then need to request another one
-            if(! getSession().prefetch())
+            if(capacity == 0)
             {
                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                          MessageCreditUnit.MESSAGE, 1,
@@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 e
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
         super.setMessageListener(messageListener);
-        if (messageListener != null && ! getSession().prefetch())
+        if (messageListener != null && capacity == 0)
         {
             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1,
@@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 e
      */
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if (! getSession().prefetch())
+        if (capacity == 0)
         {
             _syncReceive.set(true);
         }
-        if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+        if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
         {
             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1,
@@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 e
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())
         {
+           
             _0_10session.getQpidSession().messageFlush
                 (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
             _0_10session.getQpidSession().sync();
             _0_10session.getQpidSession().messageFlow
                 (getConsumerTagString(), MessageCreditUnit.BYTE,
                  0xFFFFFFFF, Option.UNRELIABLE);
-            if (getSession().prefetch())
+            
+            if (capacity > 0)
             {
                 _0_10session.getQpidSession().messageFlow
-                    (getConsumerTagString(), MessageCreditUnit.MESSAGE,
-                     _0_10session.getAMQConnection().getMaxPrefetch(),
-                     Option.UNRELIABLE);
+                                               (getConsumerTagString(),
+                                                MessageCreditUnit.MESSAGE,
+                                                capacity,
+                                                Option.UNRELIABLE);
             }
             _0_10session.syncDispatchQueue();
             o = super.getMessageFromQueue(-1);
         }
-        if (! getSession().prefetch())
+        if (capacity == 0)
         {
             _syncReceive.set(false);
         }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=945945&r1=945944&r2=945945&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue May 18 23:05:36 2010
@@ -21,8 +21,12 @@ package org.apache.qpid.test.client.dest
  */
 
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import javax.jms.Connection;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTe
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import edu.emory.mathcs.backport.java.util.Collections;
-
 public class AddressBasedDestinationTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
@@ -48,7 +50,7 @@ public class AddressBasedDestinationTest
     public void setUp() throws Exception
     {
         super.setUp();
-        _connection = getConnection();
+        _connection = getConnection() ;
         _connection.start();
     }
     
@@ -211,6 +213,7 @@ public class AddressBasedDestinationTest
                                   "}, " +   
                                   "x-bindings: [{exchange : 'amq.direct', key : test}, " + 
                                                "{exchange : 'amq.fanout'}," +
+                                               "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
                                                "{exchange : 'amq.topic', key : 'a.#'}" +
                                               "]," + 
                                      
@@ -236,7 +239,15 @@ public class AddressBasedDestinationTest
         
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", 
-                    dest.getAddressName(),"a.#", null));        
+                    dest.getAddressName(),"a.#", null));   
+        
+        Map<String,Object> args = new HashMap<String,Object>();
+        args.put("x-match","any");
+        args.put("dep","sales");
+        args.put("loc","CA");
+        assertTrue("Queue not bound as expected",(
+                (AMQSession_0_10)jmsSession).isQueueBound("amq.match", 
+                    dest.getAddressName(),null, args));
         
     }
     
@@ -273,8 +284,7 @@ public class AddressBasedDestinationTest
         // The existence of the queue is implicitly tested here
         assertTrue("Queue not bound as expected",(
                 (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", 
-                    dest.getQueueName(),"hello", Collections.emptyMap()));        
-        
+                    dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
     }
     
     public void testBindQueueWithArgs() throws Exception
@@ -331,6 +341,53 @@ public class AddressBasedDestinationTest
                     dest.getAddressName(),null, a.getOptions()));
     }
     
+    /**
+     * Test goal: Verifies the capacity property in address string is handled properly.
+     * Test strategy:
+     * Creates a destination with capacity 10.
+     * Creates consumer with client ack.
+     * Sends 15 messages to the queue, tries to receive 10.
+     * Tries to receive the 11th message and checks if its null.
+     * 
+     * Since capacity is 10 and we haven't acked any messages, 
+     * we should not have received the 11th.
+     * 
+     * Acks the 10th message and verifies we receive the rest of the msgs.
+     */
+    public void testLinkCapacity() throws Exception
+    {
+        if (!isCppBroker())
+        {
+            _logger.info("Not C++ broker, exiting test");
+            return;
+        }
+        
+        Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+        
+        String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}";
+        AMQDestination dest = new AMQAnyDestination(addr);
+        MessageConsumer cons = jmsSession.createConsumer(dest); 
+        MessageProducer prod = jmsSession.createProducer(dest);
+        
+        for (int i=0; i< 15; i++)
+        {
+            prod.send(jmsSession.createTextMessage("msg" + i) );
+        }
+        
+        for (int i=0; i< 9; i++)
+        {
+            cons.receive();
+        }
+        Message msg = cons.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Should have received the 10th message",msg);        
+        assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
+        msg.acknowledge();
+        for (int i=11; i<16; i++)
+        {
+            assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
+        }
+    }
+    
     /*public void testBindQueueForXMLExchange() throws Exception
     {
         if (!isCppBroker())



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Re: svn commit: r945945 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/destination/

Posted by Martin Ritchie <ri...@apache.org>.
Hi Rajith is the testLinkCapacity testing some special CPP broker functionality?
+        if (!isCppBroker())
+        {
+            _logger.info("Not C++ broker, exiting test");
+            return;
+        }

If it is not for the Java broker it should be excluded in the
JavaExcludes file. I know that our current exclusion method is not the
best. Solutions on a postcard ;)

However, it appears to be testing 0-10 functionality so should it not
also run against the Java broker with 0-10?

Cheers
Martin

On 19 May 2010 00:05,  <ra...@apache.org> wrote:
> Author: rajith
> Date: Tue May 18 23:05:36 2010
> New Revision: 945945
>
> URL: http://svn.apache.org/viewvc?rev=945945&view=rev
> Log:
> Implemented the feature described in QPID-2515
> However a few issues needs to be ironed out - see the JIRA for these issues.
>
> Modified:
>    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
>    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
>    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue May 18 23:05:36 2010
> @@ -142,7 +142,7 @@ public class AMQSession_0_10 extends AMQ
>      * USed to store the range of in tx messages
>      */
>     private RangeSet _txRangeSet = new RangeSet();
> -    private int _txSize = 0;
> +    private int _txSize = 0;
>     //--- constructors
>
>     /**
> @@ -560,6 +560,9 @@ public class AMQSession_0_10 extends AMQ
>             throws AMQException, FailoverException
>     {
>         boolean preAcquire;
> +
> +        long capacity = getCapacity(consumer.getDestination());
> +
>         try
>         {
>             preAcquire = ( ! consumer.isNoConsume()  &&
> @@ -578,7 +581,7 @@ public class AMQSession_0_10 extends AMQ
>
>         String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
>
> -        if (! prefetch())
> +        if (capacity == 0)
>         {
>             getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
>         }
> @@ -589,12 +592,12 @@ public class AMQSession_0_10 extends AMQ
>         getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
>                                      Option.UNRELIABLE);
>
> -        if(prefetch() && _dispatcher != null && (isStarted() || _immediatePrefetch))
> +        if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
>         {
>             // set the flow
>             getQpidSession().messageFlow(consumerTag,
>                                          MessageCreditUnit.MESSAGE,
> -                                         getAMQConnection().getMaxPrefetch(),
> +                                         capacity,
>                                          Option.UNRELIABLE);
>         }
>
> @@ -604,6 +607,21 @@ public class AMQSession_0_10 extends AMQ
>             getCurrentException();
>         }
>     }
> +
> +    private long getCapacity(AMQDestination destination)
> +    {
> +        long capacity = 0;
> +        if (destination.getDestSyntax() == DestSyntax.ADDR &&
> +                destination.getSourceLink().getCapacity() > 0)
> +        {
> +            capacity = destination.getSourceLink().getCapacity();
> +        }
> +        else if (prefetch())
> +        {
> +            capacity = getAMQConnection().getMaxPrefetch();
> +        }
> +        return capacity;
> +    }
>
>     /**
>      * Create an 0_10 message producer
> @@ -744,7 +762,9 @@ public class AMQSession_0_10 extends AMQ
>                 //only set if msg list is null
>                 try
>                 {
> -                    if (! prefetch())
> +                    long capacity = getCapacity(consumer.getDestination());
> +
> +                    if (capacity == 0)
>                     {
>                         if (consumer.getMessageListener() != null)
>                         {
> @@ -757,7 +777,7 @@ public class AMQSession_0_10 extends AMQ
>                     {
>                         getQpidSession()
>                             .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
> -                                         getAMQConnection().getMaxPrefetch(),
> +                                         capacity,
>                                          Option.UNRELIABLE);
>                     }
>                     getQpidSession()
>
> Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
> +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue May 18 23:05:36 2010
> @@ -19,6 +19,7 @@ package org.apache.qpid.client;
>
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> +import org.apache.qpid.client.AMQDestination.DestSyntax;
>  import org.apache.qpid.client.message.*;
>  import org.apache.qpid.client.protocol.AMQProtocolHandler;
>  import org.apache.qpid.framing.FieldTable;
> @@ -72,7 +73,9 @@ public class BasicMessageConsumer_0_10 e
>      */
>     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
>     private String _consumerTagString;
> -
> +
> +    private long capacity = 0;
> +
>     //--- constructor
>     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
>                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
> @@ -100,6 +103,18 @@ public class BasicMessageConsumer_0_10 e
>             }
>         }
>         _isStarted = connection.started();
> +
> +        // Destination setting overrides connection defaults
> +        if (destination.getDestSyntax() == DestSyntax.ADDR &&
> +                destination.getSourceLink().getCapacity() > 0)
> +        {
> +            capacity = destination.getSourceLink().getCapacity();
> +        }
> +        else if (getSession().prefetch())
> +        {
> +            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
> +        }
> +
>     }
>
>
> @@ -146,7 +161,7 @@ public class BasicMessageConsumer_0_10 e
>         }
>         if (messageOk)
>         {
> -            if (isMessageListenerSet() && ! getSession().prefetch())
> +            if (isMessageListenerSet() && capacity == 0)
>             {
>                 _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
>                                                           MessageCreditUnit.MESSAGE, 1,
> @@ -245,7 +260,7 @@ public class BasicMessageConsumer_0_10 e
>             }
>             // if we are syncrhonously waiting for a message
>             // and messages are not prefetched we then need to request another one
> -            if(! getSession().prefetch())
> +            if(capacity == 0)
>             {
>                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
>                                                          MessageCreditUnit.MESSAGE, 1,
> @@ -333,7 +348,7 @@ public class BasicMessageConsumer_0_10 e
>     public void setMessageListener(final MessageListener messageListener) throws JMSException
>     {
>         super.setMessageListener(messageListener);
> -        if (messageListener != null && ! getSession().prefetch())
> +        if (messageListener != null && capacity == 0)
>         {
>             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
>                                                       MessageCreditUnit.MESSAGE, 1,
> @@ -372,11 +387,11 @@ public class BasicMessageConsumer_0_10 e
>      */
>     public Object getMessageFromQueue(long l) throws InterruptedException
>     {
> -        if (! getSession().prefetch())
> +        if (capacity == 0)
>         {
>             _syncReceive.set(true);
>         }
> -        if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
> +        if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
>         {
>             _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
>                                                       MessageCreditUnit.MESSAGE, 1,
> @@ -385,23 +400,26 @@ public class BasicMessageConsumer_0_10 e
>         Object o = super.getMessageFromQueue(l);
>         if (o == null && _0_10session.isStarted())
>         {
> +
>             _0_10session.getQpidSession().messageFlush
>                 (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
>             _0_10session.getQpidSession().sync();
>             _0_10session.getQpidSession().messageFlow
>                 (getConsumerTagString(), MessageCreditUnit.BYTE,
>                  0xFFFFFFFF, Option.UNRELIABLE);
> -            if (getSession().prefetch())
> +
> +            if (capacity > 0)
>             {
>                 _0_10session.getQpidSession().messageFlow
> -                    (getConsumerTagString(), MessageCreditUnit.MESSAGE,
> -                     _0_10session.getAMQConnection().getMaxPrefetch(),
> -                     Option.UNRELIABLE);
> +                                               (getConsumerTagString(),
> +                                                MessageCreditUnit.MESSAGE,
> +                                                capacity,
> +                                                Option.UNRELIABLE);
>             }
>             _0_10session.syncDispatchQueue();
>             o = super.getMessageFromQueue(-1);
>         }
> -        if (! getSession().prefetch())
> +        if (capacity == 0)
>         {
>             _syncReceive.set(false);
>         }
>
> Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
> URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=945945&r1=945944&r2=945945&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
> +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue May 18 23:05:36 2010
> @@ -21,8 +21,12 @@ package org.apache.qpid.test.client.dest
>  */
>
>
> +import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.Map;
>  import javax.jms.Connection;
>  import javax.jms.JMSException;
> +import javax.jms.Message;
>  import javax.jms.MessageConsumer;
>  import javax.jms.MessageProducer;
>  import javax.jms.Session;
> @@ -37,8 +41,6 @@ import org.apache.qpid.test.utils.QpidTe
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> -import edu.emory.mathcs.backport.java.util.Collections;
> -
>  public class AddressBasedDestinationTest extends QpidTestCase
>  {
>     private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
> @@ -48,7 +50,7 @@ public class AddressBasedDestinationTest
>     public void setUp() throws Exception
>     {
>         super.setUp();
> -        _connection = getConnection();
> +        _connection = getConnection() ;
>         _connection.start();
>     }
>
> @@ -211,6 +213,7 @@ public class AddressBasedDestinationTest
>                                   "}, " +
>                                   "x-bindings: [{exchange : 'amq.direct', key : test}, " +
>                                                "{exchange : 'amq.fanout'}," +
> +                                               "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
>                                                "{exchange : 'amq.topic', key : 'a.#'}" +
>                                               "]," +
>
> @@ -236,7 +239,15 @@ public class AddressBasedDestinationTest
>
>         assertTrue("Queue not bound as expected",(
>                 (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
> -                    dest.getAddressName(),"a.#", null));
> +                    dest.getAddressName(),"a.#", null));
> +
> +        Map<String,Object> args = new HashMap<String,Object>();
> +        args.put("x-match","any");
> +        args.put("dep","sales");
> +        args.put("loc","CA");
> +        assertTrue("Queue not bound as expected",(
> +                (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
> +                    dest.getAddressName(),null, args));
>
>     }
>
> @@ -273,8 +284,7 @@ public class AddressBasedDestinationTest
>         // The existence of the queue is implicitly tested here
>         assertTrue("Queue not bound as expected",(
>                 (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
> -                    dest.getQueueName(),"hello", Collections.emptyMap()));
> -
> +                    dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
>     }
>
>     public void testBindQueueWithArgs() throws Exception
> @@ -331,6 +341,53 @@ public class AddressBasedDestinationTest
>                     dest.getAddressName(),null, a.getOptions()));
>     }
>
> +    /**
> +     * Test goal: Verifies the capacity property in address string is handled properly.
> +     * Test strategy:
> +     * Creates a destination with capacity 10.
> +     * Creates consumer with client ack.
> +     * Sends 15 messages to the queue, tries to receive 10.
> +     * Tries to receive the 11th message and checks if its null.
> +     *
> +     * Since capacity is 10 and we haven't acked any messages,
> +     * we should not have received the 11th.
> +     *
> +     * Acks the 10th message and verifies we receive the rest of the msgs.
> +     */
> +    public void testLinkCapacity() throws Exception
> +    {
> +        if (!isCppBroker())
> +        {
> +            _logger.info("Not C++ broker, exiting test");
> +            return;
> +        }
> +
> +        Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
> +
> +        String addr = "ADDR:my-queue; {create: always, link:{capacity: 10}}";
> +        AMQDestination dest = new AMQAnyDestination(addr);
> +        MessageConsumer cons = jmsSession.createConsumer(dest);
> +        MessageProducer prod = jmsSession.createProducer(dest);
> +
> +        for (int i=0; i< 15; i++)
> +        {
> +            prod.send(jmsSession.createTextMessage("msg" + i) );
> +        }
> +
> +        for (int i=0; i< 9; i++)
> +        {
> +            cons.receive();
> +        }
> +        Message msg = cons.receive(RECEIVE_TIMEOUT);
> +        assertNotNull("Should have received the 10th message",msg);
> +        assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
> +        msg.acknowledge();
> +        for (int i=11; i<16; i++)
> +        {
> +            assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
> +        }
> +    }
> +
>     /*public void testBindQueueForXMLExchange() throws Exception
>     {
>         if (!isCppBroker())
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:commits-subscribe@qpid.apache.org
>
>



-- 
Martin Ritchie

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org