You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 22:24:48 UTC
svn commit: r1172799 - in /camel/branches/camel-2.8.x: ./
components/camel-jms/src/main/java/org/apache/camel/component/jms/
components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
components/camel-jms/src/test/java/org/apache/camel/co...
Author: dkulp
Date: Mon Sep 19 20:24:47 2011
New Revision: 1172799
URL: http://svn.apache.org/viewvc?rev=1172799&view=rev
Log:
Merged revisions 1157475,1158206,1158281 via svnmerge from
https://svn.apache.org/repos/asf/camel/trunk
........
r1157475 | davsclaus | 2011-08-14 02:48:25 -0400 (Sun, 14 Aug 2011) | 1 line
CAMEL-4202: Added ReplyToType option. Added new Exclusive replyToType to support per producer exclusive persistent reply queues for request/reply over JMS. This allows to run faster than the Shared option.
........
r1158206 | davsclaus | 2011-08-16 07:24:27 -0400 (Tue, 16 Aug 2011) | 1 line
CAMEL-4202: ReplyToType now compliant with JMX
........
r1158281 | davsclaus | 2011-08-16 10:08:19 -0400 (Tue, 16 Aug 2011) | 1 line
CAMEL-4202: ReplyToType now compliant with JMX
........
Added:
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/ReplyToType.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/ReplyToType.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToComponentTest.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToComponentTest.java
camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplySharedReplyToTest.java
- copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplySharedReplyToTest.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
svn:mergeinfo = /camel/trunk:1157475,1158206,1158281
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Mon Sep 19 20:24:47 2011
@@ -334,6 +334,14 @@ public class JmsComponent extends Defaul
getConfiguration().setDestinationResolver(destinationResolver);
}
+ public ReplyToType getReplyToType() {
+ return configuration.getReplyToType();
+ }
+
+ public void setReplyToType(ReplyToType replyToType) {
+ configuration.setReplyToType(replyToType);
+ }
+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Sep 19 20:24:47 2011
@@ -124,6 +124,7 @@ public class JmsConfiguration implements
private boolean forceSendOriginalMessage;
// to force disabling time to live (works in both in-only or in-out mode)
private boolean disableTimeToLive;
+ private ReplyToType replyToType;
public JmsConfiguration() {
}
@@ -1176,4 +1177,19 @@ public class JmsConfiguration implements
public void setDisableTimeToLive(boolean disableTimeToLive) {
this.disableTimeToLive = disableTimeToLive;
}
+
+ /**
+ * Gets the reply to type.
+ * <p/>
+ * Will only return a value if this option has been explicit configured.
+ *
+ * @return the reply type if configured, otherwise <tt>null</tt>
+ */
+ public ReplyToType getReplyToType() {
+ return replyToType;
+ }
+
+ public void setReplyToType(ReplyToType replyToType) {
+ this.replyToType = replyToType;
+ }
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Sep 19 20:24:47 2011
@@ -983,6 +983,21 @@ public class JmsEndpoint extends Default
configuration.setDisableTimeToLive(disableTimeToLive);
}
+ @ManagedAttribute
+ public String getReplyToType() {
+ if (configuration.getReplyToType() != null) {
+ return configuration.getReplyToType().name();
+ } else {
+ return null;
+ }
+ }
+
+ @ManagedAttribute
+ public void setReplyToType(String replyToType) {
+ ReplyToType type = ReplyToType.valueOf(replyToType);
+ configuration.setReplyToType(type);
+ }
+
@ManagedAttribute(description = "Camel id")
public String getCamelId() {
return getCamelContext().getName();
@@ -1022,6 +1037,4 @@ public class JmsEndpoint extends Default
return super.createEndpointUri();
}
-
-
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Sep 19 20:24:47 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -67,6 +66,15 @@ public class JmsProducer extends Default
return;
}
try {
+ // validate that replyToType and replyTo is configured accordingly
+ if (endpoint.getReplyToType() != null) {
+ // setting temporary with a fixed replyTo is not supported
+ if (endpoint.getReplyTo() != null && endpoint.getReplyToType().equals(ReplyToType.Temporary.name())) {
+ throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary
+ + " is not supported when replyTo " + endpoint.getReplyTo() + " is also configured.");
+ }
+ }
+
if (endpoint.getReplyTo() != null) {
replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from: " + endpoint.getReplyTo());
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Sep 19 20:24:47 2011
@@ -25,6 +25,7 @@ import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.ReplyToType;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
@@ -124,45 +125,35 @@ public class PersistentQueueReplyManager
}
};
- private final class PersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
-
- private String fixedMessageSelector;
- private MessageSelectorCreator creator;
-
- private PersistentQueueMessageListenerContainer(String fixedMessageSelector) {
- this.fixedMessageSelector = fixedMessageSelector;
- }
-
- private PersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
- this.creator = creator;
- }
-
- @Override
- public String getMessageSelector() {
- String id = null;
- if (fixedMessageSelector != null) {
- id = fixedMessageSelector;
- } else if (creator != null) {
- id = creator.get();
- }
- log.trace("Using MessageSelector[{}]", id);
- return id;
- }
- }
-
protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
DefaultMessageListenerContainer answer;
- String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
- if (replyToSelectorName != null) {
- // create a random selector value we will use for the persistent reply queue
- replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
- String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
- answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
+ ReplyToType type = endpoint.getConfiguration().getReplyToType();
+ if (type == null) {
+ // use shared by default for persistent reply queues
+ type = ReplyToType.Shared;
+ }
+
+ if (ReplyToType.Shared == type) {
+ // shared reply to queues support either a fixed or dynamic JMS message selector
+ String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+ if (replyToSelectorName != null) {
+ // create a random selector value we will use for the persistent reply queue
+ replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+ String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
+ answer = new SharedPersistentQueueMessageListenerContainer(fixedMessageSelector);
+ log.debug("Using shared queue: " + endpoint.getReplyTo() + " with fixed message selector [" + fixedMessageSelector + "] as reply listener: " + answer);
+ } else {
+ // use a dynamic message selector which will select the message we want to receive as reply
+ dynamicMessageSelector = new MessageSelectorCreator();
+ answer = new SharedPersistentQueueMessageListenerContainer(dynamicMessageSelector);
+ log.debug("Using shared queue: " + endpoint.getReplyTo() + " with dynamic message selector as reply listener: " + answer);
+ }
+ } else if (ReplyToType.Exclusive == type) {
+ answer = new ExclusivePersistentQueueMessageListenerContainer();
+ log.debug("Using exclusive queue:" + endpoint.getReplyTo() + " as reply listener: " + answer);
} else {
- // use a dynamic message selector which will select the message we want to receive as reply
- dynamicMessageSelector = new MessageSelectorCreator();
- answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
+ throw new IllegalArgumentException("ReplyToType " + type + " is not supported for persistent reply queues");
}
DestinationResolver resolver = endpoint.getDestinationResolver();