You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/11/13 12:40:30 UTC
svn commit: r594492 - in /activemq/camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: jstrachan
Date: Tue Nov 13 03:40:26 2007
New Revision: 594492
URL: http://svn.apache.org/viewvc?rev=594492&view=rev
Log:
added fix for https://issues.apache.org/activemq/browse/CAMEL-212 along with test case
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=594492&r1=594491&r2=594492&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Tue Nov 13 03:40:26 2007
@@ -35,7 +35,7 @@
* A JMS {@link MessageListener} which can be used to delegate processing to a
* Camel endpoint.
*
- * @version $Revision$
+ * @version $Revision$ ;';;;
*/
public class EndpointMessageListener implements MessageListener {
private static final transient Log LOG = LogFactory.getLog(EndpointMessageListener.class);
@@ -50,6 +50,7 @@
public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
this.processor = processor;
+ endpoint.getConfiguration().configure(this);
}
public void onMessage(final Message message) {
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=594492&r1=594491&r2=594492&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Tue Nov 13 03:40:26 2007
@@ -94,6 +94,8 @@
private int transactionTimeout = -1;
private boolean preserveMessageQos;
private long requestMapPurgePollTimeMillis = 1000L;
+ private boolean disableReplyTo;
+ private boolean eagerLoadingOfProperties;
public JmsConfiguration() {
}
@@ -348,6 +350,16 @@
}
}
+
+ public void configure(EndpointMessageListener listener) {
+ if (isDisableReplyTo()) {
+ listener.setDisableReplyTo(true);
+ }
+ if (isEagerLoadingOfProperties()) {
+ listener.setEagerLoadingOfProperties(true);
+ }
+ }
+
// Properties
// -------------------------------------------------------------------------
public ConnectionFactory getConnectionFactory() {
@@ -669,6 +681,36 @@
public void setTransacted(boolean consumerTransacted) {
this.transacted = consumerTransacted;
+ }
+
+ public boolean isEagerLoadingOfProperties() {
+ return eagerLoadingOfProperties;
+ }
+
+ /**
+ * Enables eager loading of JMS properties as soon as a message is loaded which generally
+ * is inefficient as the JMS properties may not be required but sometimes can catch early any
+ * issues with the underlying JMS provider and the use of JMS properties
+ *
+ * @param eagerLoadingOfProperties whether or not to enable eager loading of JMS properties
+ * on inbound messages
+ */
+ public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
+ this.eagerLoadingOfProperties = eagerLoadingOfProperties;
+ }
+
+ public boolean isDisableReplyTo() {
+ return disableReplyTo;
+ }
+
+ /**
+ * Disables the use of the JMSReplyTo header for consumers so that inbound messages are treated as InOnly
+ * rather than InOut requests.
+ *
+ * @param disableReplyTo whether or not to disable the use of JMSReplyTo header indicating an InOut
+ */
+ public void setDisableReplyTo(boolean disableReplyTo) {
+ this.disableReplyTo = disableReplyTo;
}
// Implementation methods
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=594492&r1=594491&r2=594492&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Tue Nov 13 03:40:26 2007
@@ -30,6 +30,12 @@
* @version $Revision: $
*/
public class JmsEndpointConfigurationTest extends ContextTestSupport {
+ private Processor dummyProcessor = new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ log.info("Received: " + exchange);
+ }
+ };
+
public void testDurableSubscriberConfiguredWithDoubleSlash() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms://topic:Foo.Bar?durableSubscriptionName=James&clientId=ABC");
assertDurableSubscriberEndpointIsValid(endpoint);
@@ -42,13 +48,26 @@
public void testSelector() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar?selector=foo%3D'ABC'");
- JmsConsumer consumer = endpoint.createConsumer(new Processor() {
- public void process(Exchange exchange) throws Exception {
- log.info("Received: " + exchange);
- }
- });
+ JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+
AbstractMessageListenerContainer container = consumer.getListenerContainer();
assertEquals("selector", "foo='ABC'", container.getMessageSelector());
+
+ Object object = container.getMessageListener();
+ EndpointMessageListener messageListener = assertIsInstanceOf(EndpointMessageListener.class, object);
+ assertFalse("Should not have replyToDisabled", messageListener.isDisableReplyTo());
+ assertFalse("Should not have isEagerLoadingOfProperties()", messageListener.isEagerLoadingOfProperties());
+ }
+
+ public void testConfigureMessageListener() throws Exception {
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar?disableReplyTo=true&eagerLoadingOfProperties=true");
+ JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+
+ AbstractMessageListenerContainer container = consumer.getListenerContainer();
+ Object object = container.getMessageListener();
+ EndpointMessageListener messageListener = assertIsInstanceOf(EndpointMessageListener.class, object);
+ assertTrue("Should have replyToDisabled", messageListener.isDisableReplyTo());
+ assertTrue("Should have isEagerLoadingOfProperties()", messageListener.isEagerLoadingOfProperties());
}
protected void assertDurableSubscriberEndpointIsValid(JmsEndpoint endpoint) throws Exception {