You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/05/03 22:19:45 UTC
qpid-jms git commit: QPIDJMS-173 Add options to control consumer
presettle state.
Repository: qpid-jms
Updated Branches:
refs/heads/master 777cc4614 -> c3e37f272
QPIDJMS-173 Add options to control consumer presettle state.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c3e37f27
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c3e37f27
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c3e37f27
Branch: refs/heads/master
Commit: c3e37f27286b4e91d3bf0ac6fceb3c0ac7956dfd
Parents: 777cc46
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 3 16:19:26 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 3 16:19:26 2016 -0400
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsMessageConsumer.java | 1 +
.../org/apache/qpid/jms/JmsMessageProducer.java | 2 +-
.../org/apache/qpid/jms/JmsPresettlePolicy.java | 116 +++++++++++++-
.../java/org/apache/qpid/jms/JmsSession.java | 2 +-
.../PresettledConsumerIntegrationTest.java | 153 ++++++++++++++++---
5 files changed, 248 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 826438c..46f19ca 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -98,6 +98,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy));
consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry());
+ consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(destination, session));
session.getConnection().createResource(consumerInfo);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 84e4017..0cb1646 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -56,7 +56,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
this.anonymousProducer = destination == null;
this.producerInfo = new JmsProducerInfo(producerId);
this.producerInfo.setDestination(destination);
- this.producerInfo.setPresettle(session.getPresettlePolicy().isSendPresttled(destination, session));
+ this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(destination, session));
session.getConnection().createResource(producerInfo);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
index c6079d6..bf15ef0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
@@ -23,11 +23,17 @@ package org.apache.qpid.jms;
public class JmsPresettlePolicy {
private boolean presettleAll;
+
private boolean presettleProducers;
private boolean presettleTopicProducers;
private boolean presettleQueueProducers;
private boolean presettleTransactedProducers;
+ private boolean presettleConsumers;
+ private boolean presettleTopicConsumers;
+ private boolean presettleQueueConsumers;
+ private boolean presettleTransactedConsumers;
+
public JmsPresettlePolicy() {
}
@@ -37,6 +43,10 @@ public class JmsPresettlePolicy {
this.presettleTopicProducers = source.presettleTopicProducers;
this.presettleQueueProducers = source.presettleQueueProducers;
this.presettleTransactedProducers = source.presettleTransactedProducers;
+ this.presettleConsumers = source.presettleConsumers;
+ this.presettleTopicConsumers = source.presettleTopicConsumers;
+ this.presettleQueueConsumers = source.presettleQueueConsumers;
+ this.presettleTransactedConsumers = source.presettleTransactedConsumers;
}
public JmsPresettlePolicy copy() {
@@ -148,11 +158,11 @@ public class JmsPresettlePolicy {
* @param destination
* the destination that the producer will be sending to.
* @param session
- * the session that owns the producer that will send be sending a message.
+ * the session that owns the producer.
*
* @return true if the producer should send presettled.
*/
- public boolean isSendPresttled(JmsDestination destination, JmsSession session) {
+ public boolean isProducerPresttled(JmsDestination destination, JmsSession session) {
if (presettleAll || presettleProducers) {
return true;
@@ -166,4 +176,106 @@ public class JmsPresettlePolicy {
return false;
}
+
+ /**
+ * @return the presettleConsumers configuration value for this policy.
+ */
+ public boolean isPresettleConsumers() {
+ return presettleConsumers;
+ }
+
+ /**
+ * The presettle all consumers value to apply. When true all MessageConsumer
+ * instances created will indicate that presettled messages are requested.
+ *
+ * @param presettleConsumers
+ * the presettleConsumers value to apply to this policy.
+ */
+ public void setPresettleConsumers(boolean presettleConsumers) {
+ this.presettleConsumers = presettleConsumers;
+ }
+
+ /**
+ * @return the presettleTopicConsumers setting for this policy.
+ */
+ public boolean isPresettleTopicConsumers() {
+ return presettleTopicConsumers;
+ }
+
+ /**
+ * The presettle Topic consumers value to apply. When true any MessageConsumer for
+ * a Topic destination will indicate that presettled messages are requested.
+ *
+ * @param presettleTopicConsumers
+ * the presettleTopicConsumers value to apply to this policy.
+ */
+ public void setPresettleTopicConsumers(boolean presettleTopicConsumers) {
+ this.presettleTopicConsumers = presettleTopicConsumers;
+ }
+
+ /**
+ * @return the presettleQueueConsumers setting for this policy.
+ */
+ public boolean isPresettleQueueConsumers() {
+ return presettleQueueConsumers;
+ }
+
+ /**
+ * The presettle Queue consumers value to apply. When true any MessageConsumer for
+ * a Queue destination will indicate that presettled messages are requested.
+ *
+ * @param presettleQueueConsumers
+ * the presettleQueueConsumers value to apply to this policy.
+ */
+ public void setPresettleQueueConsumers(boolean presettleQueueConsumers) {
+ this.presettleQueueConsumers = presettleQueueConsumers;
+ }
+
+ /**
+ * @return the presettleTransactedConsumers setting for this policy.
+ */
+ public boolean isPresettleTransactedConsumers() {
+ return presettleTransactedConsumers;
+ }
+
+ /**
+ * The presettle consumers inside a transaction value to apply. When true all the
+ * MessageConsumer created in a transacted Session will indicate that presettled
+ * messages are requested.
+ *
+ * @param presettleTransactedConsumers
+ * the presettleTransactedConsumers value to apply to this policy.
+ */
+ public void setPresettleTransactedConsumers(boolean presettleTransactedConsumers) {
+ this.presettleTransactedConsumers = presettleTransactedConsumers;
+ }
+
+ /**
+ * Determines when a consumer will be created with the settlement mode set to presettled.
+ * <p>
+ * Called when the a consumer is being created to determine whether the consumer will
+ * be configured to request that the remote sends it message that are presettled.
+ * <p>
+ *
+ * @param destination
+ * the destination that the consumer will be listening to.
+ * @param session
+ * the session that owns the consumer being created.
+ *
+ * @return true if the producer should send presettled.
+ */
+ public boolean isConsumerPresttled(JmsDestination destination, JmsSession session) {
+
+ if (presettleAll || presettleConsumers) {
+ return true;
+ } else if (session.isTransacted() && presettleTransactedConsumers) {
+ return true;
+ } else if (destination != null && destination.isQueue() && presettleQueueConsumers) {
+ return true;
+ } else if (destination != null && destination.isTopic() && presettleTopicConsumers) {
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a3e2b27..64c514e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -699,7 +699,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
envelope.setDispatchId(messageSequence);
if (producer.isAnonymous()) {
- envelope.setPresettle(presettlePolicy.isSendPresttled(destination, this));
+ envelope.setPresettle(presettlePolicy.isProducerPresttled(destination, this));
}
transactionContext.send(connection, envelope);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
index bfddcf5..ff98387 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java
@@ -48,77 +48,183 @@ public class PresettledConsumerIntegrationTest extends QpidJmsTestCase {
//----- Test the amqp.presettleConsumers option --------------------------//
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedToTopic() throws Exception {
+ public void testPresettledConsumersConfigurationAppliedToTopic() throws Exception {
String presettleConfig = "?amqp.presettleConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedToQueue() throws Exception {
+ public void testPresettledConsumersConfigurationAppliedToQueue() throws Exception {
String presettleConfig = "?amqp.presettleConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception {
+ public void testPresettledConsumersConfigurationAppliedToTempTopic() throws Exception {
String presettleConfig = "?amqp.presettleConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception {
+ public void testPresettledConsumersConfigurationAppliedToTempQueue() throws Exception {
String presettleConfig = "?amqp.presettleConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
}
+ //----- Test the jms.presettlePolicy.presettleAll option -----------------//
+
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettleAllConfigurationAppliedToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettledAllConfigurationAppliedToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettledAllConfigurationAppliedToTempTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettledAllConfigurationAppliedToTempQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
}
+ //----- Test the jms.presettlePolicy.presettleTopicConsumers option ------//
+
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTopicNoRelaySupport() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettleTopicConsumersConfigurationAppliedToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToQueueNoRelaySupport() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
- doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+ public void testPresettledTopicConsumersConfigurationAppliedToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Queue.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopicNoRelaySupport() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettledTopicConsumersConfigurationAppliedToTempTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class);
}
@Test(timeout = 20000)
- public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueueNoRelaySupport() throws Exception {
- String presettleConfig = "?amqp.presettleConsumers=true";
+ public void testPresettledTopicConsumersConfigurationAppliedToTempQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryQueue.class);
+ }
+
+ //----- Test the jms.presettlePolicy.presettleQueueConsumers option ----- //
+
+ @Test(timeout = 20000)
+ public void testPresettleQueueConsumersConfigurationAppliedToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledQueueConsumersConfigurationAppliedToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledQueueConsumersConfigurationAppliedToTempTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledQueueConsumersConfigurationAppliedToTempQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true";
doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class);
}
+ //----- Test the jms.presettlePolicy.presettleTransactedConsumers option -//
+
+ @Test(timeout = 20000)
+ public void testPresettleTransactedConsumersConfigurationAppliedToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToTempTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToTempQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, TemporaryQueue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettleTransactedConsumersConfigurationAppliedToTopicNoTX() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToQueueNoTX() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToTempTopicNoTX() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTransactedConsumersConfigurationAppliedToTempQueueNoTX() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryQueue.class);
+ }
+
+ //----- Test the presettled consumer still settles if needed -------------//
+
+ @Test(timeout = 20000)
+ public void testPresettledTopicConsumerSettlesWhenNeeded() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledQueueConsumerSettlesWhenNeeded() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTempTopicConsumerSettlesWhenNeeded() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryTopic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledTempQueueConsumerSettlesWhenNeeded() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryQueue.class);
+ }
+
//----- Test Method implementation ---------------------------------------//
private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
@@ -174,7 +280,10 @@ public class PresettledConsumerIntegrationTest extends QpidJmsTestCase {
}
// Send a settled transfer, client should not send any dispositions
- testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true);
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, transferSettled);
+ if (!transferSettled) {
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ }
MessageConsumer consumer = session.createConsumer(destination);
assertNotNull(consumer.receive(100));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org