You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/04/30 00:19:06 UTC
qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-131
Repository: qpid-jms
Updated Branches:
refs/heads/master 4915a8fdf -> bac662d54
https://issues.apache.org/jira/browse/QPIDJMS-131
Initial pass of addition of JmsPresettlePolicy to allow finer control
over presettlement in the JMS client.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bac662d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bac662d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bac662d5
Branch: refs/heads/master
Commit: bac662d540ab51e418ef2b328c56507d2f9f13cb
Parents: 4915a8f
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Apr 29 18:18:48 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Apr 29 18:18:48 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 13 +-
.../apache/qpid/jms/JmsConnectionFactory.java | 60 +-
.../org/apache/qpid/jms/JmsMessageProducer.java | 17 +-
.../org/apache/qpid/jms/JmsPresettlePolicy.java | 169 ++++++
.../apache/qpid/jms/JmsRedeliveryPolicy.java | 30 +
.../java/org/apache/qpid/jms/JmsSession.java | 14 +-
.../apache/qpid/jms/meta/JmsConnectionInfo.java | 24 +
.../apache/qpid/jms/meta/JmsProducerInfo.java | 19 +
.../amqp/AmqpAnonymousFallbackProducer.java | 6 +-
.../qpid/jms/provider/amqp/AmqpConnection.java | 1 +
.../jms/provider/amqp/AmqpFixedProducer.java | 9 +-
.../amqp/builders/AmqpProducerBuilder.java | 10 +-
.../PresettledProducerIntegrationTest.java | 563 +++++++++++++++++++
.../integration/ProducerIntegrationTest.java | 32 --
14 files changed, 883 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index fd2d5be..4374954 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private ExceptionListener exceptionListener;
private JmsMessageFactory messageFactory;
private Provider provider;
- private JmsMessageIDBuilder messageIDBuilder;
private final Set<JmsConnectionListener> connectionListeners =
new CopyOnWriteArraySet<JmsConnectionListener>();
@@ -863,6 +862,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
connectionInfo.setRedeliveryPolicy(redeliveryPolicy);
}
+ public JmsPresettlePolicy getPresettlePolicy() {
+ return connectionInfo.getPresettlePolicy();
+ }
+
+ public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+ connectionInfo.setPresettlePolicy(presettlePolicy);
+ }
+
public boolean isReceiveLocalOnly() {
return connectionInfo.isReceiveLocalOnly();
}
@@ -1003,11 +1010,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
public JmsMessageIDBuilder getMessageIDBuilder() {
- return messageIDBuilder;
+ return connectionInfo.getMessageIDBuilder();
}
void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
- this.messageIDBuilder = messageIDBuilder;
+ connectionInfo.setMessageIDBuilder(messageIDBuilder);
}
public boolean isPopulateJMSXUserID() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 087f18d..9fb708d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -86,6 +86,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
+ private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
public JmsConnectionFactory() {
@@ -290,39 +291,31 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
try {
if (this.remoteURI.getQuery() != null) {
Map<String, String> map = PropertyUtil.parseQuery(this.remoteURI.getQuery());
- Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms.");
-
- Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
- if (!unused.isEmpty()) {
- String msg = ""
- + " Not all jms options could be set on the ConnectionFactory."
- + " Check the options are spelled correctly."
- + " Unused parameters=[" + unused + "]."
- + " This connection factory cannot be started.";
- throw new IllegalArgumentException(msg);
- } else {
- this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map);
- }
+ applyURIOptions(map);
+ this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map);
} else if (URISupport.isCompositeURI(this.remoteURI)) {
CompositeData data = URISupport.parseComposite(this.remoteURI);
- Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms.");
- Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
- if (!unused.isEmpty()) {
- String msg = ""
- + " Not all jms options could be set on the ConnectionFactory."
- + " Check the options are spelled correctly."
- + " Unused parameters=[" + unused + "]."
- + " This connection factory cannot be started.";
- throw new IllegalArgumentException(msg);
- } else {
- this.remoteURI = data.toURI();
- }
+ applyURIOptions(data.getParameters());
+ this.remoteURI = data.toURI();
}
} catch (Exception e) {
throw new IllegalArgumentException(e.getMessage());
}
}
+ private void applyURIOptions(Map<String, String> options) throws IllegalArgumentException {
+ Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(options, "jms.");
+ Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap);
+ if (!unused.isEmpty()) {
+ String msg = ""
+ + " Not all jms options could be set on the ConnectionFactory."
+ + " Check the options are spelled correctly."
+ + " Unused parameters=[" + unused + "]."
+ + " This connection factory cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
/**
* @return the user name used for connection authentication.
*/
@@ -524,6 +517,23 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
}
/**
+ * @return the presettlePolicy that is currently configured.
+ */
+ public JmsPresettlePolicy getPresettlePolicy() {
+ return presettlePolicy;
+ }
+
+ /**
+ * Sets the JmsPresettlePolicy that is applied to MessageProducers.
+ *
+ * @param presettlePolicy
+ * the presettlePolicy to use by connections created from this factory.
+ */
+ public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+ this.presettlePolicy = presettlePolicy;
+ }
+
+ /**
* @return the currently configured client ID prefix for auto-generated client IDs.
*/
public synchronized String getClientIDPrefix() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 600f26e..84e4017 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -40,7 +40,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected final JmsSession session;
protected final JmsConnection connection;
protected JmsProducerInfo producerInfo;
- protected final boolean flexibleDestination;
+ protected final boolean anonymousProducer;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected int priority = Message.DEFAULT_PRIORITY;
protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
@@ -53,9 +53,10 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException {
this.session = session;
this.connection = session.getConnection();
- this.flexibleDestination = destination == null;
+ this.anonymousProducer = destination == null;
this.producerInfo = new JmsProducerInfo(producerId);
this.producerInfo.setDestination(destination);
+ this.producerInfo.setPresettle(session.getPresettlePolicy().isSendPresttled(destination, session));
session.getConnection().createResource(producerInfo);
}
@@ -141,7 +142,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosed();
- if (flexibleDestination) {
+ if (anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
}
@@ -157,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
checkClosed();
- if (!flexibleDestination) {
+ if (!anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
}
@@ -231,6 +232,14 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
}
}
+ protected boolean isPresettled() {
+ return producerInfo.isPresettle();
+ }
+
+ protected boolean isAnonymous() {
+ return anonymousProducer;
+ }
+
////////////////////////////////////////////////////////////////////////////
// Connection interruption handlers.
////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
new file mode 100644
index 0000000..c6079d6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Policy object that allows for configuration of options that affect when
+ * a JMS MessageProducer will result in AMQP presettled message sends.
+ */
+public class JmsPresettlePolicy {
+
+ private boolean presettleAll;
+ private boolean presettleProducers;
+ private boolean presettleTopicProducers;
+ private boolean presettleQueueProducers;
+ private boolean presettleTransactedProducers;
+
+ public JmsPresettlePolicy() {
+ }
+
+ public JmsPresettlePolicy(JmsPresettlePolicy source) {
+ this.presettleAll = source.presettleAll;
+ this.presettleProducers = source.presettleProducers;
+ this.presettleTopicProducers = source.presettleTopicProducers;
+ this.presettleQueueProducers = source.presettleQueueProducers;
+ this.presettleTransactedProducers = source.presettleTransactedProducers;
+ }
+
+ public JmsPresettlePolicy copy() {
+ return new JmsPresettlePolicy(this);
+ }
+
+ /**
+ * @return the presettleAll setting for this policy
+ */
+ public boolean isPresettleAll() {
+ return presettleAll;
+ }
+
+ /**
+ * Sets the presettle all sends option. When true all MessageProducers
+ * will send their messages presettled.
+ *
+ * @param presettleAll
+ * the presettleAll value to apply.
+ */
+ public void setPresettleAll(boolean presettleAll) {
+ this.presettleAll = presettleAll;
+ }
+
+ /**
+ * @return the presettleProducers setting for this policy.
+ */
+ public boolean isPresettleProducers() {
+ return presettleProducers;
+ }
+
+ /**
+ * Sets the the presettle all sends option. When true all MessageProducers that
+ * are created will send their messages as settled.
+ *
+ * @param presettleProducers
+ * the presettleProducers value to apply.
+ */
+ public void setPresettleProducers(boolean presettleProducers) {
+ this.presettleProducers = presettleProducers;
+ }
+
+ /**
+ * @return the presettleTopicProducers setting for this policy
+ */
+ public boolean isPresettleTopicProducers() {
+ return presettleTopicProducers;
+ }
+
+ /**
+ * Sets the presettle Topic sends option. When true any MessageProducer that
+ * is created that sends to a Topic will send its messages presettled, and any
+ * anonymous MessageProducer will send Messages that are sent to a Topic as
+ * presettled as well.
+ *
+ * @param presettleTopicProducers
+ * the presettleTopicProducers value to apply.
+ */
+ public void setPresettleTopicProducers(boolean presettleTopicProducers) {
+ this.presettleTopicProducers = presettleTopicProducers;
+ }
+
+ /**
+ * @return the presettleQueueSends setting for this policy
+ */
+ public boolean isPresettleQueueProducers() {
+ return presettleQueueProducers;
+ }
+
+ /**
+ * Sets the presettle Queue sends option. When true any MessageProducer that
+ * is created that sends to a Queue will send its messages presettled, and any
+ * anonymous MessageProducer will send Messages that are sent to a Queue as
+ * presettled as well.
+ *
+ * @param presettleQueueProducers
+ * the presettleQueueSends value to apply.
+ */
+ public void setPresettleQueueProducers(boolean presettleQueueProducers) {
+ this.presettleQueueProducers = presettleQueueProducers;
+ }
+
+ /**
+ * @return the presettleTransactedSends setting for this policy
+ */
+ public boolean isPresettleTransactedProducers() {
+ return presettleTransactedProducers;
+ }
+
+ /**
+ * Sets the presettle in transactions option. When true any MessageProducer that is
+ * operating inside of a transacted session will send its messages presettled.
+ *
+ * @param presettleTransactedProducers the presettleTransactedSends to set
+ */
+ public void setPresettleTransactedProducers(boolean presettleTransactedProducers) {
+ this.presettleTransactedProducers = presettleTransactedProducers;
+ }
+
+ /**
+ * Determines when a producer will send message presettled.
+ * <p>
+ * Called when the a producer is being created to determine whether the producer will
+ * be configured to send all its message as presettled or not.
+ * <p>
+ * For an anonymous producer this method is called on each send to allow the policy to
+ * be applied to the target destination that the message will be sent to.
+ *
+ * @param destination
+ * the destination that the producer will be sending to.
+ * @param session
+ * the session that owns the producer that will send be sending a message.
+ *
+ * @return true if the producer should send presettled.
+ */
+ public boolean isSendPresttled(JmsDestination destination, JmsSession session) {
+
+ if (presettleAll || presettleProducers) {
+ return true;
+ } else if (session.isTransacted() && presettleTransactedProducers) {
+ return true;
+ } else if (destination != null && destination.isQueue() && presettleQueueProducers) {
+ return true;
+ } else if (destination != null && destination.isTopic() && presettleTopicProducers) {
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
index 395b982..c57e238 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java
@@ -59,4 +59,34 @@ public class JmsRedeliveryPolicy {
public void setMaxRedeliveries(int maxRedeliveries) {
this.maxRedeliveries = maxRedeliveries;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + maxRedeliveries;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ JmsRedeliveryPolicy other = (JmsRedeliveryPolicy) obj;
+ if (maxRedeliveries != other.maxRedeliveries) {
+ return false;
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index c00cdbd..a3e2b27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -93,7 +93,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final AtomicBoolean started = new AtomicBoolean();
private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
- private JmsPrefetchPolicy prefetchPolicy;
+ private final JmsPrefetchPolicy prefetchPolicy;
+ private final JmsPresettlePolicy presettlePolicy;
private final JmsMessageIDBuilder messageIDBuilder;
private final JmsSessionInfo sessionInfo;
private volatile ExecutorService executor;
@@ -108,7 +109,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
- this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy());
+ this.prefetchPolicy = connection.getPrefetchPolicy().copy();
+ this.presettlePolicy = connection.getPresettlePolicy().copy();
this.messageIDBuilder = connection.getMessageIDBuilder();
if (acknowledgementMode == SESSION_TRANSACTED) {
@@ -696,6 +698,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
envelope.setSendAsync(!sync);
envelope.setDispatchId(messageSequence);
+ if (producer.isAnonymous()) {
+ envelope.setPresettle(presettlePolicy.isSendPresttled(destination, this));
+ }
+
transactionContext.send(connection, envelope);
} finally {
sendLock.unlock();
@@ -910,8 +916,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return prefetchPolicy;
}
- public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
- this.prefetchPolicy = prefetchPolicy;
+ public JmsPresettlePolicy getPresettlePolicy() {
+ return presettlePolicy;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index d97d33d..3af306d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -20,7 +20,9 @@ import java.net.URI;
import java.nio.charset.Charset;
import org.apache.qpid.jms.JmsPrefetchPolicy;
+import org.apache.qpid.jms.JmsPresettlePolicy;
import org.apache.qpid.jms.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.message.JmsMessageIDBuilder;
/**
* Meta object that contains the JmsConnection identification and configuration
@@ -59,6 +61,8 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy();
+ private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy();
+ private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder();
private volatile byte[] encodedUserId;
@@ -89,6 +93,10 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
copy.topicPrefix = topicPrefix;
copy.connectTimeout = connectTimeout;
copy.validatePropertyNames = validatePropertyNames;
+ copy.messageIDBuilder = messageIDBuilder;
+ copy.prefetchPolicy = prefetchPolicy.copy();
+ copy.redeliveryPolicy = redeliveryPolicy.copy();
+ copy.presettlePolicy = presettlePolicy.copy();
}
public boolean isForceAsyncSend() {
@@ -264,6 +272,22 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
this.redeliveryPolicy = redeliveryPolicy.copy();
}
+ public JmsPresettlePolicy getPresettlePolicy() {
+ return presettlePolicy;
+ }
+
+ public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) {
+ this.presettlePolicy = presettlePolicy;
+ }
+
+ public JmsMessageIDBuilder getMessageIDBuilder() {
+ return messageIDBuilder;
+ }
+
+ public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) {
+ this.messageIDBuilder = messageIDBuilder;
+ }
+
public boolean isPopulateJMSXUserID() {
return populateJMSXUserID;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
index 53939a9..8b1e019 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java
@@ -22,6 +22,7 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce
private final JmsProducerId producerId;
private JmsDestination destination;
+ private boolean presettle;
public JmsProducerInfo(JmsProducerId producerId) {
if (producerId == null) {
@@ -66,6 +67,24 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce
this.destination = destination;
}
+ /**
+ * @return the presettle mode of this producer.
+ */
+ public boolean isPresettle() {
+ return presettle;
+ }
+
+ /**
+ * Sets the presettle mode of the producer, when true the producer will be created
+ * as a presettled producer and all messages it sends will be settled before dispatch.
+ *
+ * @param presettle
+ * the presettle option to set on this producer.
+ */
+ public void setPresettle(boolean presettle) {
+ this.presettle = presettle;
+ }
+
@Override
public String toString() {
return "JmsProducerInfo { " + getId() + ", destination = " + getDestination() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index a115f72..71f26e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -45,7 +45,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class);
private static final IdGenerator producerIdGenerator = new IdGenerator();
- private final AnonymousProducerCache producerCache = new AnonymousProducerCache(10);
+ private final AnonymousProducerCache producerCache;
private final String producerIdKey = producerIdGenerator.generateId();
private long producerIdCount;
@@ -61,7 +61,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
super(session, info);
if (connection.isAnonymousProducerCache()) {
+ producerCache = new AnonymousProducerCache(10);
producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize());
+ } else {
+ producerCache = null;
}
}
@@ -79,6 +82,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer {
// send to the given AMQP target.
JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
info.setDestination(envelope.getDestination());
+ info.setPresettle(this.getResourceInfo().isPresettle());
// We open a Fixed Producer instance with the target destination. Once it opens
// it will trigger the open event which will in turn trigger the send event.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index a7e818c..f284c8a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -119,6 +119,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
return remoteURI;
}
+ @Override
public AmqpProvider getProvider() {
return provider;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index ef526c9..48e9d72 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -43,6 +43,7 @@ import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
@@ -63,7 +64,6 @@ public class AmqpFixedProducer extends AmqpProducer {
private final Set<Delivery> sent = new LinkedHashSet<Delivery>();
private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>();
private byte[] encodeBuffer = new byte[1024 * 8];
- private boolean presettle = false;
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
super(session, info);
@@ -287,13 +287,8 @@ public class AmqpFixedProducer extends AmqpProducer {
}
@Override
- public void setPresettle(boolean presettle) {
- this.presettle = presettle;
- }
-
- @Override
public boolean isPresettle() {
- return presettle;
+ return getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED;
}
public long getSendTimeout() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
index 74fe18d..0e0f3f4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java
@@ -66,7 +66,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
Sender sender = getParent().getEndpoint().sender(senderName);
sender.setSource(source);
sender.setTarget(target);
- if (getParent().getConnection().isPresettleProducers()) {
+ if (resourceInfo.isPresettle() || getParent().getConnection().isPresettleProducers()) {
sender.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
@@ -78,13 +78,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS
@Override
protected AmqpProducer createResource(AmqpSession parent, JmsProducerInfo resourceInfo, Sender endpoint) {
- AmqpProducer producer = new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint);
-
- if (getParent().getConnection().isPresettleProducers()) {
- producer.setPresettle(true);
- }
-
- return producer;
+ return new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
new file mode 100644
index 0000000..e7b70ef
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.ListDescribedType;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transaction.TxnCapability;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+/**
+ * Test MessageProducers created using various configuration of the presettle options
+ */
+public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY };
+
+ //----- Test the jms.presettleAll option ---------------------------------//
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllSendToTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllSendToQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllSendToTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllSendToTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllAnonymousSendToTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllAnonymousSendToQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+ }
+ }
+
+ //----- Test the amqp.presettleProducers option --------------------------//
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+ }
+ }
+
+ //----- Test the jms.presettleProducers option ---------------------------------//
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersAnonymousTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersAnonymousQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersAnonymousTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleProducersAnonymousTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true);
+ }
+ }
+
+ //----- Test the jms.presettleTopicProducers option ---------------------------------//
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersAnonymousQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, true);
+ }
+ }
+
+ //----- Test the jms.presettleQueueProducers option ---------------------------------//
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersAnonymousQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null);
+ doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, true);
+ }
+ }
+
+ //----- Test the jms.presettleTransactedProducers option ---------------------------------//
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTempTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTempQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTopicNoTX() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersQueueNoTX() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, false);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTempTopicNoTX() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, true);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJmsPresettlePolicyPresettleTransactedProducersTempQueueNoTX() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true");
+ doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, true);
+ }
+ }
+
+ //----- Test Method implementation ---------------------------------------//
+
+ private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+ doTestProducerWithPresettleOptions(testPeer, connection, false, senderSettled, transferSettled, topic, temporary);
+ }
+
+ private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+ doTestProducerWithPresettleOptions(testPeer, connection, transacted, false, senderSettled, transferSettled, topic, temporary);
+ }
+
+ private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception {
+ testPeer.expectBegin();
+
+ Session session = null;
+ Binary txnId = null;
+
+ if (transacted) {
+ // Expect the session, with an immediate link to the transaction coordinator
+ // using a target with the expected capabilities only.
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ testPeer.expectDeclare(txnId);
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ Destination destination = null;
+ if (topic) {
+ if (temporary) {
+ String dynamicAddress = "myTempTopicAddress";
+ testPeer.expectTempTopicCreationAttach(dynamicAddress);
+ destination = session.createTemporaryTopic();
+ } else {
+ destination = session.createTopic("myTopic");
+ }
+ } else {
+ if (temporary) {
+ String dynamicAddress = "myTempQueueAddress";
+ testPeer.expectTempQueueCreationAttach(dynamicAddress);
+ destination = session.createTemporaryQueue();
+ } else {
+ destination = session.createQueue("myTopic");
+ }
+ destination = session.createQueue("myQueue");
+ }
+
+ if (senderSettled) {
+ testPeer.expectSettledSenderAttach();
+ } else {
+ testPeer.expectSenderAttach();
+ }
+
+ MessageProducer producer = null;
+ if (anonymous) {
+ producer = session.createProducer(null);
+ } else {
+ producer = session.createProducer(destination);
+ }
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ Matcher<?> stateMatcher = nullValue();
+ if (transacted) {
+ stateMatcher = new TransactionalStateMatcher();
+ ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId));
+ ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue());
+ }
+
+ ListDescribedType responseState = new Accepted();
+ if (transacted) {
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+ }
+
+ if (transferSettled) {
+ testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false);
+ } else {
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+ }
+
+ Message message = session.createTextMessage();
+
+ if (anonymous) {
+ producer.send(destination, message);
+ } else {
+ producer.send(message);
+ }
+
+ if (transacted) {
+ testPeer.expectDischarge(txnId, true);
+ }
+
+ testPeer.expectClose();
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 9625cd1..2e49080 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -1602,36 +1602,4 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
-
- @Test(timeout = 20000)
- public void testPresettledProducersConfigurationApplied() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true");
- testPeer.expectBegin();
- testPeer.expectSettledSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- // Create and transfer a new message
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- testPeer.expectTransfer(messageMatcher, nullValue(), true, false, null, false);
- testPeer.expectClose();
-
- Message message = session.createTextMessage();
-
- producer.send(message);
- assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-
- connection.close();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org