You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/23 15:00:05 UTC

svn commit: r1626995 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/messaging/address/ systests/src/test/java/org/apache/qpid/test/client/destination/ test-profiles/

Author: rgodfrey
Date: Tue Sep 23 13:00:05 2014
New Revision: 1626995

URL: http://svn.apache.org/r1626995
Log:
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
    qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 23 13:00:05 2014
@@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 e
         {
             _syncReceive.set(true);
         }
-        if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
+        if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty())
         {
             messageFlow();
         }
@@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 e
     private long evaluateCapacity(AMQDestination destination)
     {
         long capacity = 0;
-        if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+        if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0)
         {
             capacity = destination.getLink().getConsumerCapacity();
         }
@@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 e
         return capacity;
     }
 
+    @Override
+    public Message receive(final long l) throws JMSException
+    {
+        long capacity = getCapacity();
+        try
+        {
+            AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+            if (capacity == 0 && getMessageListener() == null)
+            {
+                session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                     MessageCreditUnit.MESSAGE, 1,
+                                                     Option.UNRELIABLE);
+
+                session.sync();
+
+            }
+
+            Message message = super.receive(l);
+
+            if (message == null && capacity == 0 && getMessageListener() == null)
+            {
+                session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                     MessageCreditUnit.MESSAGE, 0,
+                                                     Option.UNRELIABLE);
+                session.sync();
+
+                message = super.receiveNoWait();
+            }
+            return message;
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException
+    {
+        long capacity = getCapacity();
+        try
+        {
+            AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+            if (capacity == 0 && getMessageListener() == null)
+            {
+                session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                     MessageCreditUnit.MESSAGE, 1,
+                                                     Option.UNRELIABLE);
+
+                session.sync();
+            }
+            Message message = super.receiveNoWait();
+            if (message == null && capacity == 0 && getMessageListener() == null)
+            {
+                session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                     MessageCreditUnit.MESSAGE, 0,
+                                                     Option.UNRELIABLE);
+                session.sync();
+
+                message = super.receiveNoWait();
+            }
+            return message;
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException(e);
+        }
+
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Tue Sep 23 13:00:05 2014
@@ -242,22 +242,17 @@ public class AddressHelper
             
             if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
             {
-                MapAccessor capacityProps = new MapAccessor(
-                        (Map) ((Map) _address.getOptions().get(LINK))
-                                .get(CAPACITY));
-                link
-                        .setConsumerCapacity(capacityProps
-                                .getInt(CAPACITY_SOURCE) == null ? 0
-                                : capacityProps.getInt(CAPACITY_SOURCE));
-                link
-                        .setProducerCapacity(capacityProps
-                                .getInt(CAPACITY_TARGET) == null ? 0
-                                : capacityProps.getInt(CAPACITY_TARGET));
+                MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
+
+                Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE);
+                link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity);
+
+                Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET);
+                link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity);
             } 
             else
             {
-                int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
-                        .getInt(CAPACITY);
+                int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY);
                 link.setConsumerCapacity(cap);
                 link.setProducerCapacity(cap);
             }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Tue Sep 23 13:00:05 2014
@@ -38,8 +38,8 @@ public class Link
     private FilterType _filterType = FilterType.SUBJECT;
     private boolean _isNoLocal;
     private boolean _isDurable;
-    private int _consumerCapacity = 0;
-    private int _producerCapacity = 0;
+    private int _consumerCapacity = -1;
+    private int _producerCapacity = -1;
     private Subscription subscription;
     private Reliability reliability = Reliability.AT_LEAST_ONCE;
     private List<Binding> _bindings = Collections.emptyList();

Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Sep 23 13:00:05 2014
@@ -454,6 +454,45 @@ public class AddressBasedDestinationTest
         checkQueueForBindings(jmsSession,dest2,headersBinding);
     }
 
+    public void testZeroCapacityForSynchronousReceive() throws Exception
+    {
+        Session session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String addressString = "ADDR:my-queue; {create: always, link:{capacity: 0}}";
+        Queue session1queue = session1.createQueue(addressString);
+        Queue session2queue = session1.createQueue(addressString);
+        MessageConsumer consumer1 = session1.createConsumer(session1queue);
+        MessageConsumer consumer1withSelector = session1.createConsumer(session1queue, "key1 = 1");
+        MessageConsumer consumer2withSelector = session2.createConsumer(session2queue, "key2 = 2");
+
+        _connection.start();
+
+        MessageProducer producer = session1.createProducer(session1queue);
+
+        Message m = session1.createMessage();
+        m.setIntProperty("key1", 1);
+        m.setIntProperty("key2", 2);
+        producer.send(m);
+
+        m = session1.createMessage();
+        m.setIntProperty("key1", 1);
+        producer.send(m);
+
+        m = session1.createMessage();
+        producer.send(m);
+
+        m = session1.createMessage();
+        m.setIntProperty("key2", 2);
+        producer.send(m);
+
+        assertNotNull("First message from queue should be received",consumer1withSelector.receive(1000l));
+        assertNotNull("Last message on queue should be received", consumer2withSelector.receive(1000l));
+        assertNotNull("Second message from queue should be received", consumer1.receive(1000l));
+        assertNull("Only message remaining shouldn't match selector",consumer1withSelector.receive(500l));
+        assertNotNull("Should have been one message remaining on queue",consumer1.receive(1000l));
+        assertNull("No messages should be remaining on queue",consumer1.receive(500l));
+    }
+
     /**
      * Test goal: Verifies the capacity property in address string is handled properly.
      * Test strategy:

Modified: qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes Tue Sep 23 13:00:05 2014
@@ -30,6 +30,8 @@ org.apache.qpid.test.client.destination.
 //QPID-3392: the Java broker does not yet implement exchange creation arguments
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-3678: zero capacity not supported in 0-9-1
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testZeroCapacityForSynchronousReceive
 //QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
 //           you want a topic behaviour.  The 0-10 client thinks you must want a queue.
 org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org