You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/23 15:00:05 UTC
svn commit: r1626995 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/messaging/address/
systests/src/test/java/org/apache/qpid/test/client/destination/
test-profiles/
Author: rgodfrey
Date: Tue Sep 23 13:00:05 2014
New Revision: 1626995
URL: http://svn.apache.org/r1626995
Log:
QPID-3678 : [Java Client] Add support for setting link capacity to zero in ADDR addresses
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Sep 23 13:00:05 2014
@@ -389,7 +389,7 @@ public class BasicMessageConsumer_0_10 e
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
+ if (_0_10session.isStarted() && isMessageListenerSet() && _capacity == 0 && getSynchronousQueue().isEmpty())
{
messageFlow();
}
@@ -536,7 +536,7 @@ public class BasicMessageConsumer_0_10 e
private long evaluateCapacity(AMQDestination destination)
{
long capacity = 0;
- if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() >= 0)
{
capacity = destination.getLink().getConsumerCapacity();
}
@@ -547,4 +547,75 @@ public class BasicMessageConsumer_0_10 e
return capacity;
}
+ @Override
+ public Message receive(final long l) throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+
+ }
+
+ Message message = super.receive(l);
+
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() throws JMSException
+ {
+ long capacity = getCapacity();
+ try
+ {
+ AMQSession_0_10 session = (AMQSession_0_10) getSession();
+
+ if (capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+
+ session.sync();
+ }
+ Message message = super.receiveNoWait();
+ if (message == null && capacity == 0 && getMessageListener() == null)
+ {
+ session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 0,
+ Option.UNRELIABLE);
+ session.sync();
+
+ message = super.receiveNoWait();
+ }
+ return message;
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Tue Sep 23 13:00:05 2014
@@ -242,22 +242,17 @@ public class AddressHelper
if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
- MapAccessor capacityProps = new MapAccessor(
- (Map) ((Map) _address.getOptions().get(LINK))
- .get(CAPACITY));
- link
- .setConsumerCapacity(capacityProps
- .getInt(CAPACITY_SOURCE) == null ? 0
- : capacityProps.getInt(CAPACITY_SOURCE));
- link
- .setProducerCapacity(capacityProps
- .getInt(CAPACITY_TARGET) == null ? 0
- : capacityProps.getInt(CAPACITY_TARGET));
+ MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
+
+ Integer sourceCapacity = capacityProps.getInt(CAPACITY_SOURCE);
+ link.setConsumerCapacity(sourceCapacity == null ? -1 : sourceCapacity);
+
+ Integer targetCapacity = capacityProps.getInt(CAPACITY_TARGET);
+ link.setProducerCapacity(targetCapacity == null ? -1 : targetCapacity);
}
else
{
- int cap = _linkPropAccess.getInt(CAPACITY) == null ? 0 : _linkPropAccess
- .getInt(CAPACITY);
+ int cap = _linkPropAccess.getInt(CAPACITY) == null ? -1 : _linkPropAccess.getInt(CAPACITY);
link.setConsumerCapacity(cap);
link.setProducerCapacity(cap);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Tue Sep 23 13:00:05 2014
@@ -38,8 +38,8 @@ public class Link
private FilterType _filterType = FilterType.SUBJECT;
private boolean _isNoLocal;
private boolean _isDurable;
- private int _consumerCapacity = 0;
- private int _producerCapacity = 0;
+ private int _consumerCapacity = -1;
+ private int _producerCapacity = -1;
private Subscription subscription;
private Reliability reliability = Reliability.AT_LEAST_ONCE;
private List<Binding> _bindings = Collections.emptyList();
Modified: qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Tue Sep 23 13:00:05 2014
@@ -454,6 +454,45 @@ public class AddressBasedDestinationTest
checkQueueForBindings(jmsSession,dest2,headersBinding);
}
+ public void testZeroCapacityForSynchronousReceive() throws Exception
+ {
+ Session session1 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String addressString = "ADDR:my-queue; {create: always, link:{capacity: 0}}";
+ Queue session1queue = session1.createQueue(addressString);
+ Queue session2queue = session1.createQueue(addressString);
+ MessageConsumer consumer1 = session1.createConsumer(session1queue);
+ MessageConsumer consumer1withSelector = session1.createConsumer(session1queue, "key1 = 1");
+ MessageConsumer consumer2withSelector = session2.createConsumer(session2queue, "key2 = 2");
+
+ _connection.start();
+
+ MessageProducer producer = session1.createProducer(session1queue);
+
+ Message m = session1.createMessage();
+ m.setIntProperty("key1", 1);
+ m.setIntProperty("key2", 2);
+ producer.send(m);
+
+ m = session1.createMessage();
+ m.setIntProperty("key1", 1);
+ producer.send(m);
+
+ m = session1.createMessage();
+ producer.send(m);
+
+ m = session1.createMessage();
+ m.setIntProperty("key2", 2);
+ producer.send(m);
+
+ assertNotNull("First message from queue should be received",consumer1withSelector.receive(1000l));
+ assertNotNull("Last message on queue should be received", consumer2withSelector.receive(1000l));
+ assertNotNull("Second message from queue should be received", consumer1.receive(1000l));
+ assertNull("Only message remaining shouldn't match selector",consumer1withSelector.receive(500l));
+ assertNotNull("Should have been one message remaining on queue",consumer1.receive(1000l));
+ assertNull("No messages should be remaining on queue",consumer1.receive(500l));
+ }
+
/**
* Test goal: Verifies the capacity property in address string is handled properly.
* Test strategy:
Modified: qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes?rev=1626995&r1=1626994&r2=1626995&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaPre010Excludes Tue Sep 23 13:00:05 2014
@@ -30,6 +30,8 @@ org.apache.qpid.test.client.destination.
//QPID-3392: the Java broker does not yet implement exchange creation arguments
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-3678: zero capacity not supported in 0-9-1
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testZeroCapacityForSynchronousReceive
//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
// you want a topic behaviour. The 0-10 client thinks you must want a queue.
org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org