You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/11 16:42:24 UTC
qpid-jms git commit: QPIDJMS-219: allow listeners when prefetch is 0,
flow a new credit as needed [Forced Update!]
Repository: qpid-jms
Updated Branches:
refs/heads/master 11db59d37 -> 06a721625 (forced update)
QPIDJMS-219: allow listeners when prefetch is 0, flow a new credit as needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/06a72162
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/06a72162
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/06a72162
Branch: refs/heads/master
Commit: 06a7216253c2640aeba37ec3e889ff1598037780
Parents: b94efb7
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Nov 11 16:41:43 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Nov 11 16:41:43 2016 +0000
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 26 ++++++--
.../apache/qpid/jms/meta/JmsConsumerInfo.java | 10 +++
.../qpid/jms/provider/amqp/AmqpConsumer.java | 16 ++++-
.../integration/ConsumerIntegrationTest.java | 34 -----------
.../ZeroPrefetchIntegrationTest.java | 64 ++++++++++++++++++++
.../qpid/jms/meta/JmsConsumerInfoTest.java | 11 +++-
6 files changed, 119 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index ea93019..b224574 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -117,7 +117,9 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
}
public void init() throws JMSException {
- startConsumerResource();
+ if(!isPullConsumer()){
+ startConsumerResource();
+ }
}
private void startConsumerResource() throws JMSException {
@@ -587,12 +589,16 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
- if (consumerInfo.getPrefetchSize() == 0) {
- throw new JMSException("Illegal prefetch size of zero. This setting is not supported" +
- "for asynchronous consumers please set a value of at least 1");
- }
this.messageListener = listener;
- drainMessageQueueToListener();
+ if(listener != null) {
+ consumerInfo.setListener(true);
+ if(isPullConsumer()){
+ startConsumerResource();
+ }
+ drainMessageQueueToListener();
+ } else {
+ consumerInfo.setListener(false);
+ }
}
@Override
@@ -741,6 +747,14 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
// other being an error while attempting to copy the incoming message.
// We need to decide how to respond to these.
session.getConnection().onException(e);
+ } finally {
+ if (isPullConsumer()) {
+ try {
+ startConsumerResource();
+ } catch (JMSException e) {
+ LOG.error("Exception during credit replenishment for consumer listener {}", getConsumerId(), e);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index dd34397..b22b3b5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -35,6 +35,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
private int acknowledgementMode;
private boolean localMessageExpiry;
private boolean presettle;
+ private volatile boolean listener;
private JmsRedeliveryPolicy redeliveryPolicy;
private JmsDeserializationPolicy deserializationPolicy;
@@ -75,6 +76,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
info.lastDeliveredSequenceId = lastDeliveredSequenceId;
info.redeliveryPolicy = getRedeliveryPolicy().copy();
info.deserializationPolicy = getDeserializationPolicy().copy();
+ info.listener = listener;
}
public boolean isDurable() {
@@ -94,6 +96,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume
this.browser = browser;
}
+ public boolean isListener() {
+ return listener;
+ }
+
+ public void setListener(boolean listener) {
+ this.listener = listener;
+ }
+
public JmsDestination getDestination() {
return destination;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0077478..b60bb83 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -84,7 +84,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* The request that awaits completion of the consumer start.
*/
public void start(AsyncResult request) {
- sendFlowIfNeeded();
+ JmsConsumerInfo consumerInfo = getResourceInfo();
+ if(consumerInfo.isListener() && consumerInfo.getPrefetchSize() == 0) {
+ sendFlowForNoPrefetchListener();
+ } else {
+ sendFlowIfNeeded();
+ }
request.onSuccess();
}
@@ -309,6 +314,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
}
+ private void sendFlowForNoPrefetchListener() {
+ int currentCredit = getEndpoint().getCredit();
+ if (currentCredit < 1) {
+ int additionalCredit = 1 - currentCredit;
+ LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), additionalCredit);
+ getEndpoint().flow(additionalCredit);
+ }
+ }
+
/**
* Recovers all previously delivered but not acknowledged messages.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 63581c1..c48961d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -622,40 +622,6 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
}
}
- @Test(timeout=20000)
- public void testCannotUseMessageListener() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
- connection.start();
-
- testPeer.expectBegin();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(getTestName());
-
- testPeer.expectReceiverAttach();
-
- MessageConsumer consumer = session.createConsumer(destination);
- MessageListener listener = new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- }
- };
-
- try {
- consumer.setMessageListener(listener);
- fail("Should not allow listener to be set when prefetch is zero.");
- } catch (JMSException ex) {
- }
-
- testPeer.expectClose();
- connection.close();
-
- testPeer.waitForAllHandlersToComplete(2000);
- }
- }
-
@Test(timeout = 20000)
public void testCreateProducerInOnMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index d558cf6..29a1d46 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -24,10 +24,13 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -123,4 +126,65 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout=20000)
+ public void testZeroPrefetchMessageListener() throws Exception {
+ final CountDownLatch msgReceived = new CountDownLatch(1);
+ final CountDownLatch completeOnMessage = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // Create a connection with zero prefetch
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+
+ // Expected the consumer to attach but NOT send credit
+ testPeer.expectReceiverAttach();
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ MessageListener listener = new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ msgReceived.countDown();
+
+ try {
+ completeOnMessage.await(6, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ // Expect that once setMessageListener is called, it flows 1 credit with drain=false. Then give it a message.
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+ consumer.setMessageListener(listener);
+
+ // Wait for message to arrive
+ assertTrue("message not received in given time", msgReceived.await(6, TimeUnit.SECONDS));
+
+ // Ensure the handlers are complete at the peer
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ // Now allow onMessage to complete, expecting an accept and another flow.
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+ testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+ completeOnMessage.countDown();
+
+ // Wait for the resulting flow to be received
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/06a72162/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index 87f0908..9bca41e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -93,6 +93,7 @@ public class JmsConsumerInfoTest {
info.setSelector("select");
info.setSubscriptionName("name");
info.setRedeliveryPolicy(new JmsDefaultRedeliveryPolicy());
+ info.setListener(true);
JmsConsumerInfo copy = info.copy();
@@ -105,7 +106,7 @@ public class JmsConsumerInfoTest {
assertEquals(123456, copy.getPrefetchSize());
assertEquals("select", copy.getSelector());
assertEquals("name", copy.getSubscriptionName());
-
+ assertEquals(true, copy.isListener());
assertNotSame(info.getRedeliveryPolicy(), copy.getRedeliveryPolicy());
assertEquals(info, copy);
@@ -182,4 +183,12 @@ public class JmsConsumerInfoTest {
assertNotNull(info.getRedeliveryPolicy());
assertTrue(info.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy);
}
+
+ @Test
+ public void testIsListener() {
+ JmsConsumerInfo info = new JmsConsumerInfo(firstId);
+ assertFalse(info.isListener());
+ info.setListener(true);
+ assertTrue(info.isListener());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org