You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/06/29 09:01:56 UTC

svn commit: r789236 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/JmsBinding.java main/java/org/apache/camel/component/jms/JmsProducer.java test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java

Author: davsclaus
Date: Mon Jun 29 07:01:56 2009
New Revision: 789236

URL: http://svn.apache.org/viewvc?rev=789236&view=rev
Log:
CAMEL-1689: JMSProducer is a bit easier to read in its process as divided into inout and inonly.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java   (with props)
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=789236&r1=789235&r2=789236&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Mon Jun 29 07:01:56 2009
@@ -265,9 +265,9 @@
                 jmsMessage.setJMSReplyTo(ExchangeHelper.convertToType(exchange, Destination.class, headerValue));
             } else if (headerName.equals("JMSType")) {
                 jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
-            } else if (LOG.isDebugEnabled()) {
+            } else if (LOG.isTraceEnabled()) {
                 // The following properties are set by the MessageProducer:
-                // JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriorit
+                // JMSDeliveryMode, JMSDestination, JMSExpiration, JMSPriority
                 // The following are set on the underlying JMS provider:
                 // JMSMessageID, JMSTimestamp, JMSRedelivered
                 // log at trace level to not spam log

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=789236&r1=789235&r2=789236&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun 29 07:01:56 2009
@@ -142,6 +142,16 @@
     }
 
     public void process(final Exchange exchange) {
+        if (exchange.getPattern().isOutCapable()) {
+            // in out requires a bit more work than in only
+            processInOut(exchange);
+        } else {
+            // in only
+            processInOnly(exchange);
+        }
+    }
+
+    protected void processInOut(final Exchange exchange) {
         final org.apache.camel.Message in = exchange.getIn();
 
         String destinationName = endpoint.getDestinationName();
@@ -149,163 +159,174 @@
         if (destination == null) {
             destination = endpoint.getDestination();
         }
-        if (exchange.getPattern().isOutCapable()) {
 
-            testAndSetRequestor();
+        testAndSetRequestor();
 
-            // note due to JMS transaction semantics we cannot use a single transaction
-            // for sending the request and receiving the response
-            final Destination replyTo = requestor.getReplyTo();
+        // note due to JMS transaction semantics we cannot use a single transaction
+        // for sending the request and receiving the response
+        final Destination replyTo = requestor.getReplyTo();
 
-            if (replyTo == null) {
-                throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
-            }
+        if (replyTo == null) {
+            throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
+        }
 
-            final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
-            String correlationId = in.getHeader("JMSCorrelationID", String.class);
+        final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
+        String correlationId = in.getHeader("JMSCorrelationID", String.class);
 
-            if (correlationId == null && !msgIdAsCorrId) {
-                in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
-            }
+        if (correlationId == null && !msgIdAsCorrId) {
+            in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
+        }
 
-            final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
-            final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
+        final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
+        final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
 
-            MessageCreator messageCreator = new MessageCreator() {
-                public Message createMessage(Session session) throws JMSException {
-                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
-                    message.setJMSReplyTo(replyTo);
-                    requestor.setReplyToSelectorHeader(in, message);
+        MessageCreator messageCreator = new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
+                message.setJMSReplyTo(replyTo);
+                requestor.setReplyToSelectorHeader(in, message);
 
-                    FutureTask future;
-                    future = (!msgIdAsCorrId)
-                            ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
-                            : requestor.getReceiveFuture(callback);
+                FutureTask future;
+                future = (!msgIdAsCorrId)
+                        ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
+                        : requestor.getReceiveFuture(callback);
 
-                    futureHolder.set(future);
+                futureHolder.set(future);
 
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(endpoint + " sending JMS message: " + message);
-                    }
-                    return message;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(endpoint + " sending JMS message: " + message);
                 }
-            };
+                return message;
+            }
+        };
 
-            CamelJmsTemplate template = null;
-            CamelJmsTeemplate102 template102 = null;
-            if (endpoint.isUseVersion102()) {
-                template102 = (CamelJmsTeemplate102)getInOutTemplate();
+        CamelJmsTemplate template = null;
+        CamelJmsTeemplate102 template102 = null;
+        if (endpoint.isUseVersion102()) {
+            template102 = (CamelJmsTeemplate102)getInOutTemplate();
+        } else {
+            template = (CamelJmsTemplate)getInOutTemplate();
+        }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Using JMS API " + (endpoint.isUseVersion102() ? "v1.0.2" : "v1.1"));
+        }
+
+        if (destinationName != null) {
+            if (template != null) {
+                template.send(destinationName, messageCreator, callback);
             } else {
-                template = (CamelJmsTemplate)getInOutTemplate();
+                template102.send(destinationName, messageCreator, callback);
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Using JMS API " + (endpoint.isUseVersion102() ? "v1.0.2" : "v1.1"));
+        } else if (destination != null) {
+            if (template != null) {
+                template.send(destination, messageCreator, callback);
+            } else {
+                template102.send(destination, messageCreator, callback);
             }
+        } else {
+            throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
+        }
+
+        setMessageId(exchange);
 
-            if (destinationName != null) {
-                if (template != null) {
-                    template.send(destinationName, messageCreator, callback);
+        // lets wait and return the response
+        long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
+        try {
+            Message message = null;
+            try {
+                if (requestTimeout < 0) {
+                    message = (Message)futureHolder.get().get();
                 } else {
-                    template102.send(destinationName, messageCreator, callback);
+                    message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
                 }
-            } else if (destination != null) {
-                if (template != null) {
-                    template.send(destination, messageCreator, callback);
-                } else {
-                    template102.send(destination, messageCreator, callback);
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Future interrupted: " + e, e);
+                }
+            } catch (TimeoutException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Future timed out: " + e, e);
                 }
-            } else {
-                throw new IllegalArgumentException("Neither destination nor destinationName is specified on this endpoint: " + endpoint);
             }
+            if (message != null) {
+                // the response can be an exception
+                JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+                Object body = response.getBody();
 
-            setMessageId(exchange);
-
-            // lets wait and return the response
-            long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
-            try {
-                Message message = null;
-                try {
-                    if (requestTimeout < 0) {
-                        message = (Message)futureHolder.get().get();
-                    } else {
-                        message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
-                    }
-                } catch (InterruptedException e) {
+                if (endpoint.isTransferException() && body instanceof Exception) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Future interupted: " + e, e);
+                        LOG.debug("Reply recieved. Setting reply as Exception: " + body);
                     }
-                } catch (TimeoutException e) {
+                    // we got an exception back and endpoint was configued to transfer exception
+                    // therefore set response as exception
+                    exchange.setException((Exception) body);
+                } else {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Future timed out: " + e, e);
+                        LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
                     }
+                    // regular response
+                    exchange.setOut(response);
                 }
-                if (message != null) {
-                    // the response can be an exception
-                    JmsMessage response = new JmsMessage(message, endpoint.getBinding());
-                    Object body = response.getBody();
-
-                    if (endpoint.isTransferException() && body instanceof Exception) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Reply recieved. Setting reply as Exception: " + body);
-                        }
-                        // we got an exception back and endpoint was configued to transfer exception
-                        // therefore set response as exception
-                        exchange.setException((Exception) body);
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
-                        }
-                        // regular response
-                        exchange.setOut(response);
-                    }
 
-                    // correlation
-                    if (correlationId != null) {
-                        message.setJMSCorrelationID(correlationId);
-                        exchange.getOut().setHeader("JMSCorrelationID", correlationId);
-                    }
-                } else {
-                    // no response, so lets set a timed out exception
-                    exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-        } else {
-            // we must honor these special flags to preverse QoS 
-            if (!endpoint.isPreserveMessageQos() && !endpoint.isExplicitQosEnabled()) {
-                Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
-                if (replyTo != null) {
-                    // we are routing an existing JmsMessage, origin from another JMS endpoint
-                    // then we need to remove the existing JMSReplyTo
-                    // as we are not out capable and thus do not expect a reply, and therefore
-                    // the consumer of this message we send should not return a reply
-                    String to = destinationName != null ? destinationName : "" + destination;
-                    LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo + " to destination: " + to + " for Exchange: " + exchange);
-                    exchange.getIn().setHeader("JMSReplyTo", null);
+                // correlation
+                if (correlationId != null) {
+                    message.setJMSCorrelationID(correlationId);
+                    exchange.getOut().setHeader("JMSCorrelationID", correlationId);
                 }
+            } else {
+                // no response, so lets set a timed out exception
+                exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
             }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
 
-            MessageCreator messageCreator = new MessageCreator() {
-                public Message createMessage(Session session) throws JMSException {
-                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(endpoint + " sending JMS message: " + message);
-                    }
-                    return message;
-                }
-            };
-            if (destination != null) {
-                getInOnlyTemplate().send(destination, messageCreator);
-            } else if (destinationName != null) {
-                getInOnlyTemplate().send(destinationName, messageCreator);
-            } else  {
-                throw new IllegalArgumentException("Neither destination nor "
-                    + "destinationName are specified on this endpoint: " + endpoint);
+    }
+
+    protected void processInOnly(final Exchange exchange) {
+        final org.apache.camel.Message in = exchange.getIn();
+
+        String destinationName = endpoint.getDestinationName();
+        Destination destination = exchange.getProperty(JmsConstants.JMS_DESTINATION, Destination.class);
+        if (destination == null) {
+            destination = endpoint.getDestination();
+        }
+
+        // we must honor these special flags to preverse QoS
+        if (!endpoint.isPreserveMessageQos() && !endpoint.isExplicitQosEnabled()) {
+            Object replyTo = exchange.getIn().getHeader("JMSReplyTo");
+            if (replyTo != null) {
+                // we are routing an existing JmsMessage, origin from another JMS endpoint
+                // then we need to remove the existing JMSReplyTo
+                // as we are not out capable and thus do not expect a reply, and therefore
+                // the consumer of this message we send should not return a reply
+                String to = destinationName != null ? destinationName : "" + destination;
+                LOG.warn("Disabling JMSReplyTo as this Exchange is not OUT capable with JMSReplyTo: " + replyTo + " to destination: " + to + " for Exchange: " + exchange);
+                exchange.getIn().setHeader("JMSReplyTo", null);
             }
+        }
 
-            setMessageId(exchange);
+        MessageCreator messageCreator = new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(endpoint + " sending JMS message: " + message);
+                }
+
+                return message;
+            }
+        };
+        
+        if (destination != null) {
+            getInOnlyTemplate().send(destination, messageCreator);
+        } else if (destinationName != null) {
+            getInOnlyTemplate().send(destinationName, messageCreator);
+        } else  {
+            throw new IllegalArgumentException("Neither destination nor "
+                    + "destinationName are specified on this endpoint: " + endpoint);
         }
+
+        setMessageId(exchange);
     }
 
     protected void setMessageId(Exchange exchange) {

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java?rev=789236&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java Mon Jun 29 07:01:56 2009
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsProducerWithJMSHeaderTest extends ContextTestSupport {
+
+    @Test
+    public void testInOnlyJMSPrioritory() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        // TODO: CAMEL-1689
+        // mock.message(0).header("JMSPriority").isEqualTo(2);
+
+        template.sendBodyAndHeader("activemq:queue:foo", "Hello World", "JMSPriority", "2");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:foo").to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsProducerWithJMSHeaderTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date