You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/02/02 08:07:29 UTC
svn commit: r1239470 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
tests/camel-itest/src/test/java/org/apache/camel/itest/jms/
tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/
Author: davsclaus
Date: Thu Feb 2 07:07:28 2012
New Revision: 1239470
URL: http://svn.apache.org/viewvc?rev=1239470&view=rev
Log:
CAMEL-4961: Keep information about transacted redelivered for the lifecycle of the exchange. As the impl of Message can change during routing, and otherwise lose that detail.
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java
camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Feb 2 07:07:28 2012
@@ -172,17 +172,18 @@ public interface Exchange {
String SPLIT_COMPLETE = "CamelSplitComplete";
String SPLIT_SIZE = "CamelSplitSize";
- String TIMER_COUNTER = "CamelTimerCounter";
- String TIMER_FIRED_TIME = "CamelTimerFiredTime";
- String TIMER_NAME = "CamelTimerName";
- String TIMER_PERIOD = "CamelTimerPeriod";
- String TIMER_TIME = "CamelTimerTime";
- String TO_ENDPOINT = "CamelToEndpoint";
- String TRACE_EVENT = "CamelTraceEvent";
- String TRACE_EVENT_NODE_ID = "CamelTraceEventNodeId";
- String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp";
- String TRACE_EVENT_EXCHANGE = "CamelTraceEventExchange";
- String TRANSFER_ENCODING = "Transfer-Encoding";
+ String TIMER_COUNTER = "CamelTimerCounter";
+ String TIMER_FIRED_TIME = "CamelTimerFiredTime";
+ String TIMER_NAME = "CamelTimerName";
+ String TIMER_PERIOD = "CamelTimerPeriod";
+ String TIMER_TIME = "CamelTimerTime";
+ String TO_ENDPOINT = "CamelToEndpoint";
+ String TRACE_EVENT = "CamelTraceEvent";
+ String TRACE_EVENT_NODE_ID = "CamelTraceEventNodeId";
+ String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp";
+ String TRACE_EVENT_EXCHANGE = "CamelTraceEventExchange";
+ String TRANSACTED_REDELIVERED = "CamelTransactedRedelivered";
+ String TRANSFER_ENCODING = "Transfer-Encoding";
String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted";
String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Thu Feb 2 07:07:28 2012
@@ -332,14 +332,30 @@ public final class DefaultExchange imple
}
public Boolean isTransactedRedelivered() {
- // lets avoid adding methods to the Message API, so we use the
- // DefaultMessage to allow component specific messages to extend
- // and implement the isTransactedRedelivered method.
- DefaultMessage msg = getIn(DefaultMessage.class);
- if (msg != null) {
- return msg.isTransactedRedelivered();
+ Boolean answer = null;
+
+ // check property first, as the implementation details to know if the message
+ // was transacted redelivered is message specific, and thus the message implementation
+ // could potentially change during routing, and therefore later we may not know if the
+ // original message was transacted redelivered or not, therefore we store this detail
+ // as a exchange property to keep it around for the lifecycle of the exchange
+ if (hasProperties()) {
+ answer = getProperty(Exchange.TRANSACTED_REDELIVERED, null, Boolean.class);
+ }
+
+ if (answer == null) {
+ // lets avoid adding methods to the Message API, so we use the
+ // DefaultMessage to allow component specific messages to extend
+ // and implement the isTransactedRedelivered method.
+ DefaultMessage msg = getIn(DefaultMessage.class);
+ if (msg != null) {
+ answer = msg.isTransactedRedelivered();
+ // store as property to keep around
+ setProperty(Exchange.TRANSACTED_REDELIVERED, answer);
+ }
}
- return null;
+
+ return answer;
}
public boolean isRollbackOnly() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Thu Feb 2 07:07:28 2012
@@ -108,6 +108,12 @@ public class DefaultUnitOfWork implement
exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
}
}
+
+ // setup whether the exchange is transacted redelivered or not (if not initialized before)
+ // store as property so we know that the origin exchange was transacted redelivered
+ if (exchange.getProperty(Exchange.TRANSACTED_REDELIVERED) == null) {
+ exchange.setProperty(Exchange.TRANSACTED_REDELIVERED, exchange.isTransactedRedelivered());
+ }
// fire event
try {
Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java?rev=1239470&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java Thu Feb 2 07:07:28 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * Test that exchange.isTransactedRedelivered() is kept around even when
+ * Message implementation changes from JmsMessage to DefaultMessage, when routing
+ * from JMS over Jetty.
+ */
+public class JMSTransactionIsTransactedRedeliveredTest extends CamelSpringTestSupport {
+
+ private static int port = AvailablePortFinder.getNextAvailable(20039);
+ static {
+ //set them as system properties so Spring can use the property placeholder
+ //things to set them into the URL's in the spring contexts
+ System.setProperty("Jetty.port", Integer.toString(port));
+ }
+
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext(
+ "/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml");
+ }
+
+ @Override
+ protected int getExpectedRouteCount() {
+ // have to return 0 because we enable advice with
+ return 0;
+ }
+
+ @Override
+ public boolean isUseAdviceWith() {
+ return true;
+ }
+
+ @Test
+ public void testTransactionSuccess() throws Exception {
+ context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(AssertionError.class).to("log:error", "mock:error");
+ }
+ });
+ context.start();
+
+ // there should be no assertion errors
+ MockEndpoint error = getMockEndpoint("mock:error");
+ error.expectedMessageCount(0);
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("Bye World");
+ // success at 3rd attempt
+ mock.message(0).header("count").isEqualTo(3);
+
+ MockEndpoint jetty = getMockEndpoint("mock:jetty");
+ jetty.expectedMessageCount(1);
+
+ template.sendBody("activemq:queue:okay", "Hello World");
+
+ mock.assertIsSatisfied();
+ jetty.assertIsSatisfied();
+ error.assertIsSatisfied();
+ }
+
+ public static class MyBeforeProcessor implements Processor {
+ private int count;
+
+ public void process(Exchange exchange) throws Exception {
+ ++count;
+
+ // the first is not redelivered
+ if (count == 1) {
+ assertFalse("Should not be transacted redelivered", exchange.isTransactedRedelivered());
+ } else {
+ assertTrue("Should be transacted redelivered", exchange.isTransactedRedelivered());
+ }
+
+ if (count < 3) {
+ throw new IllegalArgumentException("Forced exception");
+ }
+ exchange.getIn().setHeader("count", count);
+ }
+ }
+
+ public static class MyAfterProcessor implements Processor {
+ public void process(Exchange exchange) throws Exception {
+ // origin message should be a transacted redeliveries
+ assertTrue("Should be transacted redelivered", exchange.isTransactedRedelivered());
+ }
+ }
+
+}
Added: camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml?rev=1239470&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml (added)
+++ camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml Thu Feb 2 07:07:28 2012
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:camel="http://camel.apache.org/schema/spring"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent"/>
+
+ <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
+ <property name="maxConnections" value="8"/>
+ <property name="connectionFactory" ref="jmsConnectionFactory"/>
+ </bean>
+
+ <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
+ </bean>
+
+ <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
+ <property name="connectionFactory" ref="poolConnectionFactory"/>
+ </bean>
+
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
+ <property name="connectionFactory" ref="poolConnectionFactory"/>
+ <property name="transacted" value="true"/>
+ <property name="transactionManager" ref="jmsTransactionManager"/>
+ <!-- we do not want any exceptions to be logged -->
+ <property name="errorHandlerLoggingLevel" value="OFF"/>
+ <property name="errorHandlerLogStackTrace" value="false"/>
+ </bean>
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+ <!-- we do not want any exceptions to be logged, neither that a rollback occurred -->
+ <errorHandler id="txEH" type="TransactionErrorHandler" rollbackLoggingLevel="OFF">
+ <redeliveryPolicy logStackTrace="false" logExhausted="false"/>
+ </errorHandler>
+
+ <route errorHandlerRef="txEH">
+ <from uri="activemq:queue:okay"/>
+ <transacted/>
+ <to uri="mock:before"/>
+ <process ref="myBeforeProcessor"/>
+ <to uri="jetty:http://localhost:{{Jetty.port}}/test"/>
+ <process ref="myAfterProcessor"/>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="jetty:http://localhost:{{Jetty.port}}/test"/>
+ <transform>
+ <constant>Bye World</constant>
+ </transform>
+ <to uri="mock:jetty"/>
+ </route>
+
+ </camelContext>
+
+ <bean id="myBeforeProcessor" class="org.apache.camel.itest.jms.JMSTransactionIsTransactedRedeliveredTest.MyBeforeProcessor"/>
+ <bean id="myAfterProcessor" class="org.apache.camel.itest.jms.JMSTransactionIsTransactedRedeliveredTest.MyAfterProcessor"/>
+
+</beans>