You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2010/01/26 10:02:28 UTC
svn commit: r903130 - in
/servicemix/components/bindings/servicemix-jms/trunk/src:
main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Author: gertv
Date: Tue Jan 26 09:02:28 2010
New Revision: 903130
URL: http://svn.apache.org/viewvc?rev=903130&view=rev
Log:
SMXCOMP-702: Allow per-message specification of JMS priority and time-to-live
Modified:
servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java?rev=903130&r1=903129&r2=903130&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java Tue Jan 26 09:02:28 2010
@@ -106,6 +106,8 @@
private Store store;
private AbstractMessageListenerContainer listenerContainer;
+
+ private boolean preserveMessageQos;
/**
* @return the destination
@@ -470,6 +472,24 @@
public void setReplyDestinationName(String replyDestinationName) {
this.replyDestinationName = replyDestinationName;
}
+
+ /**
+ * @return the preserveMessageQos
+ */
+ public boolean isPreserveMessageQos() {
+ return preserveMessageQos;
+ }
+
+ /**
+ * Specifies whether we want to send message using the QoS settings
+ * specified on the message instead in order to preserve message QoS.
+ * The default is <code>false</code>.
+ *
+ * @param preserveMessageQos should support per message QoS?
+ */
+ public void setPreserveMessageQos(boolean preserveMessageQos) {
+ this.preserveMessageQos = preserveMessageQos;
+ }
/**
* Process the incoming JBI exchange
@@ -976,7 +996,8 @@
return cont;
}
- public static class JmsTemplateUtil extends JmsTemplate {
+ public class JmsTemplateUtil extends JmsTemplate {
+
public void send(Session session, Destination destination, MessageCreator messageCreator) throws JmsException {
try {
doSend(session, destination, messageCreator);
@@ -991,9 +1012,38 @@
throw convertJmsAccessException(ex);
}
}
+
+ /**
+ * Override so we can support preserving the Qos settings that have
+ * been set on the message.
+ */
+ protected void doSend(MessageProducer producer, Message message) throws JMSException {
+ if (isPreserveMessageQos()) {
+ producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), getTimeToLive(message));
+ } else {
+ super.doSend(producer, message);
+ }
+ }
+
+ /**
+ * Determine the remaining time-to-live for a JMS Message
+ * @return 0 if no ttl has been set, the remaining ttl otherwise
+ */
+ protected long getTimeToLive(Message message) throws JMSException {
+ long ttl = message.getJMSExpiration();
+ if (ttl != 0) {
+ ttl = ttl - System.currentTimeMillis();
+ // Message had expired.. so set the ttl as small as possible
+ if (ttl <= 0) {
+ ttl = 1;
+ }
+ }
+ return ttl;
+ }
}
- public static class JmsTemplate102Util extends JmsTemplateUtil {
+ public class JmsTemplate102Util extends JmsTemplateUtil {
+
protected void initDefaultStrategies() {
setMessageConverter(new SimpleMessageConverter102());
}
@@ -1064,17 +1114,26 @@
}
protected void doSend(MessageProducer producer, Message message) throws JMSException {
- if (isPubSubDomain()) {
- if (isExplicitQosEnabled()) {
- ((TopicPublisher) producer).publish(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ if (isPreserveMessageQos()) {
+ long ttl = getTimeToLive(message);
+ if (isPubSubDomain()) {
+ ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
} else {
- ((TopicPublisher) producer).publish(message);
+ ((QueueSender) producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
}
} else {
- if (isExplicitQosEnabled()) {
- ((QueueSender) producer).send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ if (isPubSubDomain()) {
+ if (isExplicitQosEnabled()) {
+ ((TopicPublisher) producer).publish(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ } else {
+ ((TopicPublisher) producer).publish(message);
+ }
} else {
- ((QueueSender) producer).send(message);
+ if (isExplicitQosEnabled()) {
+ ((QueueSender) producer).send(message, getDeliveryMode(), getPriority(), getTimeToLive());
+ } else {
+ ((QueueSender) producer).send(message);
+ }
}
}
}
Modified: servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java?rev=903130&r1=903129&r2=903130&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java (original)
+++ servicemix/components/bindings/servicemix-jms/trunk/src/test/java/org/apache/servicemix/jms/JmsProviderEndpointTest.java Tue Jan 26 09:02:28 2010
@@ -48,11 +48,14 @@
*/
private static final String MSG_PROPERTY = "PropertyTest";
private static final String MSG_PROPERTY_BLACKLISTED = "BadPropertyTest";
+
+ private static final String MSG_PRIORITY_PROPERTY = "JMSPriority";
+ private static final String MSG_EXPIRATION_PROPERTY = "JMSExpiration";
protected List<String> blackList;
public void testSendWithoutProperties() throws Exception {
- container.activateComponent(createEndpoint(false), "servicemix-jms");
+ container.activateComponent(createEndpoint(false, false), "servicemix-jms");
InOnly me = client.createInOnlyExchange();
NormalizedMessage inMessage = me.getInMessage();
@@ -94,6 +97,26 @@
assertNotNull(msg);
}
+ public void testProviderInOnlyWithMessageQoS() throws Exception {
+ container.activateComponent(createEndpoint(true, true), "servicemix-jms");
+
+ InOnly me = client.createInOnlyExchange();
+ NormalizedMessage inMessage = me.getInMessage();
+ inMessage.setProperty(MSG_PRIORITY_PROPERTY, 2);
+ long ttl = System.currentTimeMillis() + 5000;
+ inMessage.setProperty(MSG_EXPIRATION_PROPERTY, ttl);
+ inMessage.setContent(new StringSource("<hello>world</hello>"));
+ me.setService(new QName("jms"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+
+ Message msg = jmsTemplate.receive("destination");
+ assertNotNull(msg);
+ assertEquals(2, msg.getJMSPriority());
+ assertTrue("Found none-zero value for JMSExpiration", msg
+ .getLongProperty(MSG_EXPIRATION_PROPERTY) != 0);
+ }
+
public void testProviderInOnlyWithoutReplyDest() throws Exception {
JmsComponent component = new JmsComponent();
@@ -290,10 +313,10 @@
// Helper methods
private JmsComponent createEndpoint() {
- return createEndpoint(true);
+ return createEndpoint(true, false);
}
- private JmsComponent createEndpoint(boolean copyProperties) {
+ private JmsComponent createEndpoint(boolean copyProperties, boolean preserveMsgQoS) {
// initialize the black list
blackList = new LinkedList<String>();
blackList.add(MSG_PROPERTY_BLACKLISTED);
@@ -308,6 +331,7 @@
endpoint.setEndpoint("endpoint");
endpoint.setConnectionFactory(connectionFactory);
endpoint.setDestinationName("destination");
+ endpoint.setPreserveMessageQos(preserveMsgQoS);
component.setEndpoints(new JmsProviderEndpoint[] {endpoint});
return component;
}