You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/23 14:00:46 UTC
svn commit: r757395 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Mon Mar 23 13:00:43 2009
New Revision: 757395
URL: http://svn.apache.org/viewvc?rev=757395&view=rev
Log:
CAMEL-585: Added option transferException to send a cause exception back as reply when using request-reply with Camel JMS.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java
- copied, changed from r757317, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Mon Mar 23 13:00:43 2009
@@ -60,11 +60,12 @@
}
public void onMessage(final Message message) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " consumer receiving JMS message: " + message);
+ }
+
RuntimeCamelException rce = null;
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " consumer receiving JMS message: " + message);
- }
Destination replyDestination = getReplyToDestination(message);
final JmsExchange exchange = createExchange(message, replyDestination);
if (eagerLoadingOfProperties) {
@@ -76,28 +77,41 @@
// get the correct jms message to send as reply
JmsMessage body = null;
+ Exception cause = null;
+ boolean sendReply = false;
if (exchange.isFailed()) {
if (exchange.getException() != null) {
// an exception occurred while processing
- // TODO: Camel-585 somekind of flag to determine if we should send the exchange back to the client
- // or do as now where we wrap as runtime exception to be thrown back to spring so it can do rollback
- rce = wrapRuntimeCamelException(exchange.getException());
+ if (endpoint.isTransferException()) {
+ // send the exception as reply
+ body = null;
+ cause = exchange.getException();
+ sendReply = true;
+ } else {
+ // only throw exception if endpoint is not configured to transfer exceptions
+ // back to caller
+ rce = wrapRuntimeCamelException(exchange.getException());
+ }
} else if (exchange.getFault().getBody() != null) {
// a fault occurred while processing
body = exchange.getFault();
+ sendReply = true;
}
} else {
// process OK so get the reply
body = exchange.getOut(false);
+ sendReply = true;
}
// send the reply if we got a response and the exchange is out capable
- if (rce == null && body != null && !disableReplyTo && exchange.getPattern().isOutCapable()) {
- sendReply(replyDestination, message, exchange, body);
+ if (sendReply && !disableReplyTo && exchange.getPattern().isOutCapable()) {
+ sendReply(replyDestination, message, exchange, body, cause);
}
+
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
+
if (rce != null) {
getExceptionHandler().handleException(rce);
throw rce;
@@ -194,7 +208,8 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange, final JmsMessage out) {
+ protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange,
+ final JmsMessage out, final Exception cause) {
if (replyDestination == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot send reply message as there is no replyDestination for: " + out);
@@ -203,7 +218,7 @@
}
getTemplate().send(replyDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
- Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
+ Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Mon Mar 23 13:00:43 2009
@@ -176,24 +176,28 @@
/**
* Creates a JMS message from the Camel exchange and message
*
+ * @param exchange the current exchange
* @param session the JMS session used to create the message
* @return a newly created JMS Message instance containing the
* @throws JMSException if the message could not be created
*/
public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
- return makeJmsMessage(exchange, exchange.getIn(), session);
+ return makeJmsMessage(exchange, exchange.getIn(), session, null);
}
/**
* Creates a JMS message from the Camel exchange and message
*
+ * @param exchange the current exchange
+ * @param camelMessage the body to make a javax.jms.Message as
* @param session the JMS session used to create the message
+ * @param cause optional exception occured that should be sent as reply instead of a regular body
* @return a newly created JMS Message instance containing the
* @throws JMSException if the message could not be created
*/
- public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session)
- throws JMSException {
+ public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session, Exception cause) throws JMSException {
Message answer = null;
+
boolean alwaysCopy = (endpoint != null) && endpoint.getConfiguration().isAlwaysCopyMessage();
if (!alwaysCopy && camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage)camelMessage;
@@ -201,10 +205,22 @@
answer = jmsMessage.getJmsMessage();
}
}
+
if (answer == null) {
- answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
- appendJmsProperties(answer, exchange, camelMessage);
+ if (cause != null) {
+ // an exception occured so send it as response
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will create JmsMessage with caused exception: " + cause);
+ }
+ // create jms message containg the caused exception
+ answer = createJmsMessage(cause, session);
+ } else {
+ // create regular jms message using the camel message body
+ answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
+ appendJmsProperties(answer, exchange, camelMessage);
+ }
}
+
return answer;
}
@@ -218,8 +234,7 @@
/**
* Appends the JMS headers from the Camel {@link JmsMessage}
*/
- public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in)
- throws JMSException {
+ public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in) throws JMSException {
Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
String headerName = entry.getKey();
@@ -296,6 +311,13 @@
return null;
}
+ protected Message createJmsMessage(Exception cause, Session session) throws JMSException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using JmsMessageType: " + Object);
+ }
+ return session.createObjectMessage(cause);
+ }
+
protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
JmsMessageType type = null;
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Mar 23 13:00:43 2009
@@ -134,6 +134,7 @@
private JmsMessageType jmsMessageType;
private JmsKeyFormatStrategy jmsKeyFormatStrategy;
private boolean transferExchange;
+ private boolean transferException;
public JmsConfiguration() {
}
@@ -1174,4 +1175,12 @@
public void setTransferExchange(boolean transferExchange) {
this.transferExchange = transferExchange;
}
+
+ public boolean isTransferException() {
+ return transferException;
+ }
+
+ public void setTransferException(boolean transferException) {
+ this.transferException = transferException;
+ }
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Mar 23 13:00:43 2009
@@ -787,6 +787,14 @@
getConfiguration().setTransferExchange(transferExchange);
}
+ public boolean isTransferException() {
+ return getConfiguration().isTransferException();
+ }
+
+ public void setTransferException(boolean transferException) {
+ getConfiguration().setTransferException(transferException);
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Mar 23 13:00:43 2009
@@ -99,13 +99,11 @@
try {
JmsConfiguration c = endpoint.getConfiguration();
if (c.getReplyTo() != null) {
- requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint
- .getExecutorService());
+ requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getExecutorService());
requestor.start();
} else {
if (affinity == RequestorAffinity.PER_PRODUCER) {
- requestor = new Requestor(endpoint.getConfiguration(), endpoint
- .getExecutorService());
+ requestor = new Requestor(endpoint.getConfiguration(), endpoint.getExecutorService());
requestor.start();
} else if (affinity == RequestorAffinity.PER_ENDPOINT) {
requestor = endpoint.getRequestor();
@@ -175,11 +173,11 @@
final CamelJmsTemplate template = (CamelJmsTemplate)getInOutTemplate();
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
- Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
message.setJMSReplyTo(replyTo);
requestor.setReplyToSelectorHeader(in, message);
- FutureTask future = null;
+ FutureTask future;
future = (!msgIdAsCorrId)
? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
: requestor.getReceiveFuture(callback);
@@ -223,13 +221,32 @@
}
}
if (message != null) {
- exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
+ // the response can be an exception
+ JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+ Object body = response.getBody();
+
+ if (endpoint.isTransferException() && body instanceof Exception) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reply recieved. Setting reply as Exception: " + body);
+ }
+ // we got an exception back and endpoint was configued to transfer exception
+ // therefore set response as exception
+ exchange.setException((Exception) body);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reply recieved. Setting reply as OUT message: " + body);
+ }
+ // regular response
+ exchange.setOut(response);
+ }
+
+ // correlation
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
exchange.getOut(false).setHeader("JMSCorrelationID", correlationId);
}
} else {
- // lets set a timed out exception
+ // no response, so lets set a timed out exception
exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
}
} catch (Exception e) {
@@ -238,7 +255,7 @@
} else {
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
- Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " sending JMS message: " + message);
}
Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java (from r757317, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java&r1=757317&r2=757395&rev=757395&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java Mon Mar 23 13:00:43 2009
@@ -23,53 +23,55 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
-public class JmsTransferExchangeTest extends ContextTestSupport {
-
- protected String getUri() {
- return "activemq:queue:foo?transferExchange=true";
- }
-
- public void testBodyOnly() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
+public class JmsTransferExceptionTest extends ContextTestSupport {
- template.sendBody("direct:start", "Hello World");
+ private static int counter;
- assertMockEndpointsSatisfied();
+ protected String getUri() {
+ return "activemq:queue:foo?transferException=true";
}
- public void testBodyAndHeaderOnly() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedHeaderReceived("foo", "cheese");
-
- template.sendBodyAndHeader("direct:start", "Hello World", "foo", "cheese");
+ @Override
+ protected void setUp() throws Exception {
+ counter = 0;
+ super.setUp();
+ }
+
+ public void testOk() throws Exception {
+ Object out = template.requestBody(getUri(), "Hello World");
+ assertEquals("Bye World", out);
+
+ assertEquals(1, counter);
+ }
+
+ public void testTransferExeption() throws Exception {
+ // should fail as we thrown an exception
+ MockEndpoint dead = getMockEndpoint("mock:dead");
+ dead.expectedMessageCount(1);
+
+ // we send something that causes a remote exception
+ // then we expect our producer template to thrown
+ // an exception with the remote exception as cause
+ try {
+ template.requestBody(getUri(), "Kabom");
+ fail("Should have thrown an exception");
+ } catch (RuntimeCamelException e) {
+ assertEquals("Boom", e.getCause().getMessage());
+ assertNotNull("Should contain a remote stacktrace", e.getCause().getStackTrace());
+ }
assertMockEndpointsSatisfied();
- }
- public void testSendExchange() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedHeaderReceived("foo", "cheese");
- mock.expectedPropertyReceived("bar", 123);
-
- template.send("direct:start", new Processor() {
- public void process(Exchange exchange) throws Exception {
- exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader("foo", "cheese");
- exchange.setProperty("bar", 123);
- }
- });
-
- assertMockEndpointsSatisfied();
+ // we still try redeliver
+ assertEquals(3, counter);
}
protected CamelContext createCamelContext() throws Exception {
@@ -86,10 +88,22 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to(getUri());
- from(getUri()).to("mock:result");
+ errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+ from(getUri())
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ counter++;
+
+ String body = exchange.getIn().getBody(String.class);
+ if (body.equals("Kabom")) {
+ throw new IllegalArgumentException("Boom");
+ }
+ exchange.getOut().setBody("Bye World");
+ }
+ });
}
};
}
-}
+}
\ No newline at end of file