You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/02/02 08:07:29 UTC

svn commit: r1239470 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ tests/camel-itest/src/test/java/org/apache/camel/itest/jms/ tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/

Author: davsclaus
Date: Thu Feb  2 07:07:28 2012
New Revision: 1239470

URL: http://svn.apache.org/viewvc?rev=1239470&view=rev
Log:
CAMEL-4961: Keep information about transacted redelivered for the lifecycle of the exchange. As the impl of Message can change during routing, and otherwise lose that detail.

Added:
    camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java
    camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Feb  2 07:07:28 2012
@@ -172,17 +172,18 @@ public interface Exchange {
     String SPLIT_COMPLETE     = "CamelSplitComplete";
     String SPLIT_SIZE         = "CamelSplitSize";
 
-    String TIMER_COUNTER         = "CamelTimerCounter";
-    String TIMER_FIRED_TIME      = "CamelTimerFiredTime";
-    String TIMER_NAME            = "CamelTimerName";
-    String TIMER_PERIOD          = "CamelTimerPeriod";
-    String TIMER_TIME            = "CamelTimerTime";
-    String TO_ENDPOINT           = "CamelToEndpoint";
-    String TRACE_EVENT           = "CamelTraceEvent";
-    String TRACE_EVENT_NODE_ID   = "CamelTraceEventNodeId";
-    String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp";
-    String TRACE_EVENT_EXCHANGE  = "CamelTraceEventExchange";
-    String TRANSFER_ENCODING     = "Transfer-Encoding";
+    String TIMER_COUNTER          = "CamelTimerCounter";
+    String TIMER_FIRED_TIME       = "CamelTimerFiredTime";
+    String TIMER_NAME             = "CamelTimerName";
+    String TIMER_PERIOD           = "CamelTimerPeriod";
+    String TIMER_TIME             = "CamelTimerTime";
+    String TO_ENDPOINT            = "CamelToEndpoint";
+    String TRACE_EVENT            = "CamelTraceEvent";
+    String TRACE_EVENT_NODE_ID    = "CamelTraceEventNodeId";
+    String TRACE_EVENT_TIMESTAMP  = "CamelTraceEventTimestamp";
+    String TRACE_EVENT_EXCHANGE   = "CamelTraceEventExchange";
+    String TRANSACTED_REDELIVERED = "CamelTransactedRedelivered";
+    String TRANSFER_ENCODING      = "Transfer-Encoding";
 
     String UNIT_OF_WORK_EXHAUSTED    = "CamelUnitOfWorkExhausted";
     String UNIT_OF_WORK_PROCESS_SYNC = "CamelUnitOfWorkProcessSync";

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Thu Feb  2 07:07:28 2012
@@ -332,14 +332,30 @@ public final class DefaultExchange imple
     }
 
     public Boolean isTransactedRedelivered() {
-        // lets avoid adding methods to the Message API, so we use the
-        // DefaultMessage to allow component specific messages to extend
-        // and implement the isTransactedRedelivered method.
-        DefaultMessage msg = getIn(DefaultMessage.class);
-        if (msg != null) {
-            return msg.isTransactedRedelivered();
+        Boolean answer = null;
+
+        // check property first, as the implementation details to know if the message
+        // was transacted redelivered is message specific, and thus the message implementation
+        // could potentially change during routing, and therefore later we may not know if the
+        // original message was transacted redelivered or not, therefore we store this detail
+        // as a exchange property to keep it around for the lifecycle of the exchange
+        if (hasProperties()) {
+            answer = getProperty(Exchange.TRANSACTED_REDELIVERED, null, Boolean.class); 
+        }
+        
+        if (answer == null) {
+            // lets avoid adding methods to the Message API, so we use the
+            // DefaultMessage to allow component specific messages to extend
+            // and implement the isTransactedRedelivered method.
+            DefaultMessage msg = getIn(DefaultMessage.class);
+            if (msg != null) {
+                answer = msg.isTransactedRedelivered();
+                // store as property to keep around
+                setProperty(Exchange.TRANSACTED_REDELIVERED, answer);
+            }
         }
-        return null;
+
+        return answer;
     }
 
     public boolean isRollbackOnly() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1239470&r1=1239469&r2=1239470&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Thu Feb  2 07:07:28 2012
@@ -108,6 +108,12 @@ public class DefaultUnitOfWork implement
                 exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId);
             }
         }
+        
+        // setup whether the exchange is transacted redelivered or not (if not initialized before)
+        // store as property so we know that the origin exchange was transacted redelivered
+        if (exchange.getProperty(Exchange.TRANSACTED_REDELIVERED) == null) {
+            exchange.setProperty(Exchange.TRANSACTED_REDELIVERED, exchange.isTransactedRedelivered());
+        }
 
         // fire event
         try {

Added: camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java?rev=1239470&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java (added)
+++ camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.java Thu Feb  2 07:07:28 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * Test that exchange.isTransactedRedelivered() is kept around even when
+ * Message implementation changes from JmsMessage to DefaultMessage, when routing
+ * from JMS over Jetty.
+ */
+public class JMSTransactionIsTransactedRedeliveredTest extends CamelSpringTestSupport {
+
+    private static int port = AvailablePortFinder.getNextAvailable(20039);
+    static {
+        //set them as system properties so Spring can use the property placeholder
+        //things to set them into the URL's in the spring contexts
+        System.setProperty("Jetty.port", Integer.toString(port));
+    }
+
+    protected ClassPathXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext(
+                "/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml");
+    }
+
+    @Override
+    protected int getExpectedRouteCount() {
+        // have to return 0 because we enable advice with
+        return 0;
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Test
+    public void testTransactionSuccess() throws Exception {
+        context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(AssertionError.class).to("log:error", "mock:error");
+            }
+        });
+        context.start();
+
+        // there should be no assertion errors
+        MockEndpoint error = getMockEndpoint("mock:error");
+        error.expectedMessageCount(0);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("Bye World");
+        // success at 3rd attempt
+        mock.message(0).header("count").isEqualTo(3);
+
+        MockEndpoint jetty = getMockEndpoint("mock:jetty");
+        jetty.expectedMessageCount(1);
+
+        template.sendBody("activemq:queue:okay", "Hello World");
+
+        mock.assertIsSatisfied();
+        jetty.assertIsSatisfied();
+        error.assertIsSatisfied();
+    }
+
+    public static class MyBeforeProcessor implements Processor {
+        private int count;
+
+        public void process(Exchange exchange) throws Exception {
+            ++count;
+
+            // the first is not redelivered
+            if (count == 1) {
+                assertFalse("Should not be transacted redelivered", exchange.isTransactedRedelivered());
+            } else {
+                assertTrue("Should be transacted redelivered", exchange.isTransactedRedelivered());
+            }
+
+            if (count < 3) {
+                throw new IllegalArgumentException("Forced exception");
+            }
+            exchange.getIn().setHeader("count", count);
+        }
+    }
+
+    public static class MyAfterProcessor implements Processor {
+        public void process(Exchange exchange) throws Exception {
+            // origin message should be a transacted redeliveries
+            assertTrue("Should be transacted redelivered", exchange.isTransactedRedelivered());
+        }
+    }
+
+}

Added: camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml
URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml?rev=1239470&view=auto
==============================================================================
--- camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml (added)
+++ camel/trunk/tests/camel-itest/src/test/resources/org/apache/camel/itest/jms/JMSTransactionIsTransactedRedeliveredTest.xml Thu Feb  2 07:07:28 2012
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent"/>
+
+  <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
+    <property name="maxConnections" value="8"/>
+    <property name="connectionFactory" ref="jmsConnectionFactory"/>
+  </bean>
+
+  <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
+  </bean>
+
+  <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
+    <property name="connectionFactory" ref="poolConnectionFactory"/>
+  </bean>
+
+  <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
+    <property name="connectionFactory" ref="poolConnectionFactory"/>
+    <property name="transacted" value="true"/>
+    <property name="transactionManager" ref="jmsTransactionManager"/>
+    <!-- we do not want any exceptions to be logged -->
+    <property name="errorHandlerLoggingLevel" value="OFF"/>
+    <property name="errorHandlerLogStackTrace" value="false"/>
+  </bean>
+
+  <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+    <!-- we do not want any exceptions to be logged, neither that a rollback occurred -->
+    <errorHandler id="txEH" type="TransactionErrorHandler" rollbackLoggingLevel="OFF">
+      <redeliveryPolicy logStackTrace="false" logExhausted="false"/>
+    </errorHandler>
+
+    <route errorHandlerRef="txEH">
+      <from uri="activemq:queue:okay"/>
+      <transacted/>
+      <to uri="mock:before"/>
+      <process ref="myBeforeProcessor"/>
+      <to uri="jetty:http://localhost:{{Jetty.port}}/test"/>
+      <process ref="myAfterProcessor"/>
+      <to uri="mock:result"/>
+    </route>
+
+    <route>
+      <from uri="jetty:http://localhost:{{Jetty.port}}/test"/>
+      <transform>
+        <constant>Bye World</constant>
+      </transform>
+      <to uri="mock:jetty"/>
+    </route>
+
+  </camelContext>
+
+  <bean id="myBeforeProcessor" class="org.apache.camel.itest.jms.JMSTransactionIsTransactedRedeliveredTest.MyBeforeProcessor"/>
+  <bean id="myAfterProcessor" class="org.apache.camel.itest.jms.JMSTransactionIsTransactedRedeliveredTest.MyAfterProcessor"/>
+
+</beans>