You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/23 14:00:46 UTC

svn commit: r757395 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Mon Mar 23 13:00:43 2009
New Revision: 757395

URL: http://svn.apache.org/viewvc?rev=757395&view=rev
Log:
CAMEL-585: Added option transferException to send a cause exception back as reply when using request-reply with Camel JMS.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java
      - copied, changed from r757317, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Mon Mar 23 13:00:43 2009
@@ -60,11 +60,12 @@
     }
 
     public void onMessage(final Message message) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(endpoint + " consumer receiving JMS message: " + message);
+        }
+
         RuntimeCamelException rce = null;
         try {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(endpoint + " consumer receiving JMS message: " + message);
-            }
             Destination replyDestination = getReplyToDestination(message);
             final JmsExchange exchange = createExchange(message, replyDestination);
             if (eagerLoadingOfProperties) {
@@ -76,28 +77,41 @@
 
             // get the correct jms message to send as reply
             JmsMessage body = null;
+            Exception cause = null;
+            boolean sendReply = false;
             if (exchange.isFailed()) {
                 if (exchange.getException() != null) {
                     // an exception occurred while processing
-                    // TODO: Camel-585 somekind of flag to determine if we should send the exchange back to the client
-                    // or do as now where we wrap as runtime exception to be thrown back to spring so it can do rollback
-                    rce = wrapRuntimeCamelException(exchange.getException());
+                    if (endpoint.isTransferException()) {
+                        // send the exception as reply
+                        body = null;
+                        cause = exchange.getException();
+                        sendReply = true;
+                    } else {
+                        // only throw exception if endpoint is not configured to transfer exceptions
+                        // back to caller
+                        rce = wrapRuntimeCamelException(exchange.getException());
+                    }
                 } else if (exchange.getFault().getBody() != null) {
                     // a fault occurred while processing
                     body = exchange.getFault();
+                    sendReply = true;
                 }
             } else {
                 // process OK so get the reply
                 body = exchange.getOut(false);
+                sendReply = true;
             }
 
             // send the reply if we got a response and the exchange is out capable
-            if (rce == null && body != null && !disableReplyTo && exchange.getPattern().isOutCapable()) {
-                sendReply(replyDestination, message, exchange, body);
+            if (sendReply && !disableReplyTo && exchange.getPattern().isOutCapable()) {
+                sendReply(replyDestination, message, exchange, body, cause);
             }
+
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
+
         if (rce != null) {
             getExceptionHandler().handleException(rce);
             throw rce;
@@ -194,7 +208,8 @@
     // Implementation methods
     //-------------------------------------------------------------------------
 
-    protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange, final JmsMessage out) {
+    protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange,
+                             final JmsMessage out, final Exception cause) {
         if (replyDestination == null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Cannot send reply message as there is no replyDestination for: " + out);
@@ -203,7 +218,7 @@
         }
         getTemplate().send(replyDestination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
-                Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
+                Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
 
                 if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
                     String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Mon Mar 23 13:00:43 2009
@@ -176,24 +176,28 @@
     /**
      * Creates a JMS message from the Camel exchange and message
      *
+     * @param exchange the current exchange
      * @param session the JMS session used to create the message
      * @return a newly created JMS Message instance containing the
      * @throws JMSException if the message could not be created
      */
     public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
-        return makeJmsMessage(exchange, exchange.getIn(), session);
+        return makeJmsMessage(exchange, exchange.getIn(), session, null);
     }
 
     /**
      * Creates a JMS message from the Camel exchange and message
      *
+     * @param exchange the current exchange
+     * @param camelMessage the body to make a javax.jms.Message as
      * @param session the JMS session used to create the message
+     * @param cause optional exception occured that should be sent as reply instead of a regular body
      * @return a newly created JMS Message instance containing the
      * @throws JMSException if the message could not be created
      */
-    public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session)
-        throws JMSException {
+    public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session, Exception cause) throws JMSException {
         Message answer = null;
+
         boolean alwaysCopy = (endpoint != null) && endpoint.getConfiguration().isAlwaysCopyMessage();
         if (!alwaysCopy && camelMessage instanceof JmsMessage) {
             JmsMessage jmsMessage = (JmsMessage)camelMessage;
@@ -201,10 +205,22 @@
                 answer = jmsMessage.getJmsMessage();
             }
         }
+
         if (answer == null) {
-            answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
-            appendJmsProperties(answer, exchange, camelMessage);
+            if (cause != null) {
+                // an exception occured so send it as response
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Will create JmsMessage with caused exception: " + cause);
+                }
+                // create jms message containg the caused exception
+                answer = createJmsMessage(cause, session);
+            } else {
+                // create regular jms message using the camel message body
+                answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
+                appendJmsProperties(answer, exchange, camelMessage);
+            }
         }
+
         return answer;
     }
 
@@ -218,8 +234,7 @@
     /**
      * Appends the JMS headers from the Camel {@link JmsMessage}
      */
-    public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in)
-        throws JMSException {
+    public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in) throws JMSException {
         Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
         for (Map.Entry<String, Object> entry : entries) {
             String headerName = entry.getKey();
@@ -296,6 +311,13 @@
         return null;
     }
 
+    protected Message createJmsMessage(Exception cause, Session session) throws JMSException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Using JmsMessageType: " + Object);
+        }
+        return session.createObjectMessage(cause);
+    }
+
     protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
         JmsMessageType type = null;
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Mar 23 13:00:43 2009
@@ -134,6 +134,7 @@
     private JmsMessageType jmsMessageType;
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
     private boolean transferExchange;
+    private boolean transferException;
 
     public JmsConfiguration() {
     }
@@ -1174,4 +1175,12 @@
     public void setTransferExchange(boolean transferExchange) {
         this.transferExchange = transferExchange;
     }
+
+    public boolean isTransferException() {
+        return transferException;
+    }
+
+    public void setTransferException(boolean transferException) {
+        this.transferException = transferException;
+    }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Mar 23 13:00:43 2009
@@ -787,6 +787,14 @@
         getConfiguration().setTransferExchange(transferExchange);
     }
 
+    public boolean isTransferException() {
+        return getConfiguration().isTransferException();
+    }
+
+    public void setTransferException(boolean transferException) {
+        getConfiguration().setTransferException(transferException);
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Mar 23 13:00:43 2009
@@ -99,13 +99,11 @@
                 try {
                     JmsConfiguration c = endpoint.getConfiguration();
                     if (c.getReplyTo() != null) {
-                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint
-                            .getExecutorService());
+                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getExecutorService());
                         requestor.start();
                     } else {
                         if (affinity == RequestorAffinity.PER_PRODUCER) {
-                            requestor = new Requestor(endpoint.getConfiguration(), endpoint
-                                .getExecutorService());
+                            requestor = new Requestor(endpoint.getConfiguration(), endpoint.getExecutorService());
                             requestor.start();
                         } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
                             requestor = endpoint.getRequestor();
@@ -175,11 +173,11 @@
             final CamelJmsTemplate template = (CamelJmsTemplate)getInOutTemplate();
             MessageCreator messageCreator = new MessageCreator() {
                 public Message createMessage(Session session) throws JMSException {
-                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
                     message.setJMSReplyTo(replyTo);
                     requestor.setReplyToSelectorHeader(in, message);
 
-                    FutureTask future = null;
+                    FutureTask future;
                     future = (!msgIdAsCorrId)
                             ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
                             : requestor.getReceiveFuture(callback);
@@ -223,13 +221,32 @@
                     }
                 }
                 if (message != null) {
-                    exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
+                    // the response can be an exception
+                    JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+                    Object body = response.getBody();
+
+                    if (endpoint.isTransferException() && body instanceof Exception) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Reply recieved. Setting reply as Exception: " + body);
+                        }
+                        // we got an exception back and endpoint was configued to transfer exception
+                        // therefore set response as exception
+                        exchange.setException((Exception) body);
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
+                        }
+                        // regular response
+                        exchange.setOut(response);
+                    }
+
+                    // correlation
                     if (correlationId != null) {
                         message.setJMSCorrelationID(correlationId);
                         exchange.getOut(false).setHeader("JMSCorrelationID", correlationId);
                     }
                 } else {
-                    // lets set a timed out exception
+                    // no response, so lets set a timed out exception
                     exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
                 }
             } catch (Exception e) {
@@ -238,7 +255,7 @@
         } else {
             MessageCreator messageCreator = new MessageCreator() {
                 public Message createMessage(Session session) throws JMSException {
-                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(endpoint + " sending JMS message: " + message);
                     }

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java (from r757317, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java&r1=757317&r2=757395&rev=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java Mon Mar 23 13:00:43 2009
@@ -23,53 +23,55 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision$
  */
-public class JmsTransferExchangeTest extends ContextTestSupport {
-
-    protected String getUri() {
-        return "activemq:queue:foo?transferExchange=true";
-    }
-
-    public void testBodyOnly() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
+public class JmsTransferExceptionTest extends ContextTestSupport {
 
-        template.sendBody("direct:start", "Hello World");
+    private static int counter;
 
-        assertMockEndpointsSatisfied();
+    protected String getUri() {
+        return "activemq:queue:foo?transferException=true";
     }
 
-    public void testBodyAndHeaderOnly() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedHeaderReceived("foo", "cheese");
-
-        template.sendBodyAndHeader("direct:start", "Hello World", "foo", "cheese");
+    @Override
+    protected void setUp() throws Exception {
+        counter = 0;
+        super.setUp();
+    }
+
+    public void testOk() throws Exception {
+        Object out = template.requestBody(getUri(), "Hello World");
+        assertEquals("Bye World", out);
+
+        assertEquals(1, counter);
+    }
+
+    public void testTransferExeption() throws Exception {
+        // should fail as we thrown an exception
+        MockEndpoint dead = getMockEndpoint("mock:dead");
+        dead.expectedMessageCount(1);
+
+        // we send something that causes a remote exception
+        // then we expect our producer template to thrown
+        // an exception with the remote exception as cause
+        try {
+            template.requestBody(getUri(), "Kabom");
+            fail("Should have thrown an exception");
+        } catch (RuntimeCamelException e) {
+            assertEquals("Boom", e.getCause().getMessage());
+            assertNotNull("Should contain a remote stacktrace", e.getCause().getStackTrace());
+        }
 
         assertMockEndpointsSatisfied();
-    }
 
-    public void testSendExchange() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedHeaderReceived("foo", "cheese");
-        mock.expectedPropertyReceived("bar", 123);
-
-        template.send("direct:start", new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("Hello World");
-                exchange.getIn().setHeader("foo", "cheese");
-                exchange.setProperty("bar", 123);
-            }
-        });
-
-        assertMockEndpointsSatisfied();
+        // we still try redeliver
+        assertEquals(3, counter);
     }
 
     protected CamelContext createCamelContext() throws Exception {
@@ -86,10 +88,22 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to(getUri());
-                from(getUri()).to("mock:result");
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+                from(getUri())
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                counter++;
+
+                                String body = exchange.getIn().getBody(String.class);
+                                if (body.equals("Kabom")) {
+                                    throw new IllegalArgumentException("Boom");
+                                }
+                                exchange.getOut().setBody("Bye World");
+                            }
+                        });
             }
         };
     }
 
-}
+}
\ No newline at end of file