You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/04/25 19:20:29 UTC

svn commit: r532413 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/component/mock/ camel-jms/src/main/java/org/apache/camel/component/jms/ camel-jms/src/test/java/org/apache/came...

Author: chirino
Date: Wed Apr 25 10:20:28 2007
New Revision: 532413

URL: http://svn.apache.org/viewvc?view=rev&rev=532413
Log:
- Removed the transactionPolicy attribute from the RouteBuilder since this was not actually configuring the inbound transaction policy as first envisioned.  This may come back in a different shape/form.
- JpaMessageIdRepository now starts it's own transaction if called from a non transaction context.
- MockEndpoint now has a assertWait() method that can be used to wait for messages to arive before assertions are made against those messages.
- Added some more tests and imporved that TransactedJmsRouteTest
- The JmsConfiguration now uses a CacheLevel of CACHE_CONSUMER by default since this most efficient way to recieve messages from JMS.  


Added:
    activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
    activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
    activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
    activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Apr 25 10:20:28 2007
@@ -416,7 +416,7 @@
         else {
             processor = new CompositeProcessor<E>(answer);
         }
-        return wrapInTransactionInterceptor(processor);
+        return processor;
     }
 
     /**
@@ -435,13 +435,6 @@
      */
     protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws Exception {
         return getErrorHandlerBuilder().createErrorHandler(processor);
-    }
-
-    /**
-     * A strategy method which allows transaction interceptors to be applied to a processor
-     */
-    protected Processor<E> wrapInTransactionInterceptor(Processor<E> processor) throws Exception {
-        return getBuilder().getTransactionPolicy().wrap(processor);
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Wed Apr 25 10:20:28 2007
@@ -19,21 +19,13 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.FactoryFinder;
-import org.apache.camel.util.NoFactoryAvailableException;
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.Injector;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ReflectionInjector;
-import org.apache.camel.impl.NoPolicy;
 
 /**
  * A <a href="http://activemq.apache.org/camel/dsl.html">Java DSL</a>
@@ -45,7 +37,6 @@
     private List<FromBuilder<E>> fromBuilders = new ArrayList<FromBuilder<E>>();
     private AtomicBoolean initalized = new AtomicBoolean(false);
     private List<Route<E>> routes = new ArrayList<Route<E>>();
-    private Policy<E> transactionPolicy;
 
     protected RouteBuilder() {
         this(null);
@@ -94,17 +85,6 @@
         return this;
     }
 
-    /**
-     * Specifies the transaction interceptor to be used for routes created from this builder
-     *
-     * @param interceptor the transaction interceptor to use
-     * @return the current builder
-     */
-    public RouteBuilder<E> transactionPolicy(Policy<E> interceptor) {
-        setTransactionPolicy(interceptor);
-        return this;
-    }
-
     // Properties
     //-----------------------------------------------------------------------
     public CamelContext getContext() {
@@ -132,20 +112,6 @@
         return fromBuilders;
     }
 
-    public Policy<E> getTransactionPolicy() throws Exception {
-        if (transactionPolicy == null) {
-            transactionPolicy = createTransactionPolicy();
-        }
-        return transactionPolicy;
-    }
-
-    /**
-     * Sets the interceptor used wrap processors in a transaction
-     */
-    public void setTransactionPolicy(Policy<E> transactionInterceptor) {
-        this.transactionPolicy = transactionInterceptor;
-    }
-
     // Implementation methods
     //-----------------------------------------------------------------------
     protected void checkInitialized() throws Exception {
@@ -183,20 +149,6 @@
      */
     protected CamelContext createContainer() {
         return new DefaultCamelContext();
-    }
-
-    /**
-     * Factory method
-     */
-    protected Policy<E> createTransactionPolicy() throws Exception {
-        FactoryFinder finder = new FactoryFinder();
-        try {
-            return (Policy<E>) finder.newInstance("TransactionPolicy", getContext().getInjector());
-        }
-        catch (NoFactoryAvailableException e) {
-            // lets use the default
-            return new NoPolicy<E>();
-        }
     }
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Wed Apr 25 10:20:28 2007
@@ -17,6 +17,13 @@
  */
 package org.apache.camel.component.mock;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -29,13 +36,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A Mock endpoint which provides a literate, fluent API for testing routes using
  * a <a href="http://jmock.org/">JMock style</a> API.
@@ -53,6 +53,26 @@
     private long sleepForEmptyTest = 0L;
 	private int expectedMinimumCount=-1;
 
+    public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
+    	long start = System.currentTimeMillis();
+    	long left = unit.toMillis(timeout);
+    	long end = start + left;
+        for (MockEndpoint endpoint : endpoints) {
+			if( !endpoint.await(left, TimeUnit.MILLISECONDS) )
+	    		throw new AssertionError("Timeout waiting for endpoints to receive enough messages. "+endpoint.getEndpointUri()+" timed out.");
+			left = end - System.currentTimeMillis();
+			if( left <= 0 )
+				left = 0;
+        }
+    }
+
+    public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
+    	assertWait(timeout, unit, endpoints);
+        for (MockEndpoint endpoint : endpoints) {
+            endpoint.assertIsSatisfied();
+        }
+    }
+
     public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
         for (MockEndpoint endpoint : endpoints) {
             endpoint.assertIsSatisfied();
@@ -94,7 +114,7 @@
     public void assertIsSatisfied() throws InterruptedException {
         assertIsSatisfied(sleepForEmptyTest);
     }
-
+    
     /**
      * Validates that all the available expectations on this endpoint are satisfied; or throw an exception
      */
@@ -321,5 +341,18 @@
 
 	public int getExpectedMinimumCount() {
 		return expectedMinimumCount;
+	}
+
+	public void await() throws InterruptedException {
+		if( latch!=null ) {
+			latch.await();
+		}
+	}
+
+	public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+		if( latch!=null ) {
+			return latch.await(timeout, unit);
+		}
+		return true;
 	}
 }

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Wed Apr 25 10:20:28 2007
@@ -174,12 +174,16 @@
             if (concurrentConsumers >= 0) {
                 listenerContainer.setConcurrentConsumers(concurrentConsumers);
             }
+            
             if (cacheLevel >= 0) {
                 listenerContainer.setCacheLevel(cacheLevel);
-            }
-            if (cacheName != null) {
+            } else if (cacheName != null) {
                 listenerContainer.setCacheLevelName(cacheName);
+            } else {
+            	// Default to CACHE_CONSUMER unless specified.  This works best with most JMS providers.
+            	listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
             }
+            
             if (idleTaskExecutionLimit >= 0) {
                 listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
             }

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Apr 25 10:20:28 2007
@@ -131,4 +131,5 @@
     public void setSelector(String selector) {
         this.selector = selector;
     }
+
 }

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java Wed Apr 25 10:20:28 2007
@@ -18,13 +18,18 @@
 package org.apache.camel.component.jms;
 
 import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import static org.apache.camel.component.mock.MockEndpoint.assertWait;
+
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.builder.ProcessorFactory;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.DelegateProcessor;
@@ -57,26 +62,68 @@
 		        Policy notsupported = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_NOT_SUPPORTED"));
 		        Policy requirenew = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRES_NEW"));
 
-		        DelegateProcessor rollback = new DelegateProcessor() {
-		        	@Override
-		        	public void process(Object exchange) {
-		        		processNext(exchange);
-		        		throw new RuntimeException("rollback");
-		        	}
+		        Policy rollback = new Policy() {
+					public Processor wrap(Processor processor) {
+						return new DelegateProcessor(processor) {
+				        	@Override
+				        	public void process(Object exchange) {
+				        		processNext(exchange);
+				        		throw new RuntimeException("rollback");
+				        	}
+				        	
+				        	@Override
+				        	public String toString() {
+				                return "rollback(" + next + ")";
+				        	}
+				        };
+					}
+		        };
+		        
+		        Policy catchRollback = new Policy() {
+					public Processor wrap(Processor processor) {
+						return new DelegateProcessor(processor) {
+				        	@Override
+				        	public void process(Object exchange) {
+				        		try {
+				        			processNext(exchange);
+				        		} catch ( Throwable e ) {
+				        		}
+				        	}
+				        	@Override
+				        	public String toString() {
+				                return "catchRollback(" + next + ")";
+				        	}
+				        };
+					}
 		        };
-		        				
-		        // Used to verify that transacted sends will succeed.
-				from("activemq:queue:mock.a").trace().to("mock:a");      // Used to validate messages are sent to the target.
+		        
+				// NOTE: ErrorHandler has to be disabled since it operates within the failed transaction.
+		        inheritErrorHandler(false);		        
+		        // Used to validate messages are sent to the target.
+				from("activemq:queue:mock.a").trace().to("mock:a");
 		        
 				// Receive from a and send to target in 1 tx.
-		        transactionPolicy("PROPAGATION_REQUIRED");
-				from("activemq:queue:a").trace().to("activemq:queue:mock.a");
+				from("activemq:queue:a").to("activemq:queue:mock.a");
 				
 				// Cause an error after processing the send.  The send to activemq:queue:mock.a should rollback 
 				// since it is participating in the inbound transaction, but mock:b does not participate so we should see the message get
 				// there.  Also, expect multiple inbound retries as the message is rolled back.
-				from("activemq:queue:b").inheritErrorHandler(false).trace().intercept(rollback).to("activemq:queue:mock.a", "mock:b"); 
+		        //transactionPolicy(requried);
+				from("activemq:queue:b").policy(rollback).to("activemq:queue:mock.a", "mock:b"); 
+				
+				// Cause an error after processing the send in a new transaction.  The send to activemq:queue:mock.a should rollback 
+				// since the rollback is within it's transaction, but mock:b does not participate so we should see the message get
+				// there.  Also, expect the message to be successfully consumed since the rollback error is not propagated.
+		        //transactionPolicy(requried);
+				from("activemq:queue:c").policy(catchRollback).policy(requirenew).policy(rollback).to("activemq:queue:mock.a", "mock:b");
+				
+				// Cause an error after processing the send in without a transaction.  The send to activemq:queue:mock.a should succeed. 
+				// Also, expect the message to be successfully consumed since the rollback error is not propagated.
+		        from("activemq:queue:d").policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a"); 
 
+//		        JmsEndpoint endpoint = (JmsEndpoint)endpoint("activemq:queue:e");
+//		        from(endpoint).policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a"); 
+				
 			}
 		};
 	}
@@ -95,40 +142,62 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+        
+        for (Route route : this.context.getRoutes()) {
+    		System.out.println(route);
+		}
+        
         mockEndpointA = (MockEndpoint) resolveMandatoryEndpoint("mock:a");
         mockEndpointB = (MockEndpoint) resolveMandatoryEndpoint("mock:b");
     }
+    
+    @Override
+    protected void tearDown() throws Exception {
+    	super.tearDown();
+    	spring.destroy();
+    }
 
-	public void testReuqiredSend() throws Exception {
+	public void testSenarioA() throws Exception {
 		String expected = getName()+": "+System.currentTimeMillis();
         mockEndpointA.expectedBodiesReceived(expected);
         send("activemq:queue:a", expected);
         assertIsSatisfied(mockEndpointA);
 	}
 
-	public void testRequiredSendAndRollback() throws Exception {
+	public void testSenarioB() throws Exception {
 		String expected = getName()+": "+System.currentTimeMillis();
-        mockEndpointA.expectedMessageCount(0);
-        mockEndpointB.expectedMinimumMessageCount(5); // May be more since spring seems to go into tight loop redelivering.
+		mockEndpointA.expectedMessageCount(0);
+        mockEndpointB.expectedMinimumMessageCount(2); // May be more since spring seems to go into tight loop re-delivering.
         send("activemq:queue:b", expected);
+        assertIsSatisfied(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+	}
+
+	public void testSenarioC() throws Exception {
+		String expected = getName()+": "+System.currentTimeMillis();
+		mockEndpointA.expectedMessageCount(0);
+        mockEndpointB.expectedMessageCount(1); // Should only get 1 message the incoming transaction does not rollback.
+        send("activemq:queue:c", expected);
+
+        // Wait till the endpoints get their messages.
+        assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+
+        // Wait a little more to make sure extra messages are not received.
+        Thread.sleep(1000);
+        
         assertIsSatisfied(mockEndpointA,mockEndpointB);
-        int t = mockEndpointB.getReceivedCounter();
-        System.out.println("Actual Deliveries: "+t);
 	}
 
-	/** 
-	 * Validates that the send was done in a new transaction.  Message should be consumed from A,
-	 * But
-	 * 
-	 * @throws Exception
-	 */
-	public void xtestSendRequireNewAndRollack() throws Exception {
+	public void testSenarioD() throws Exception {
 		String expected = getName()+": "+System.currentTimeMillis();
-        mockEndpointA.expectedMessageCount(0);
+		mockEndpointA.expectedMessageCount(1);
+        send("activemq:queue:d", expected);
 
-        send("activemq:queue:a", expected);
+        // Wait till the endpoints get their messages.
+        assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
 
+        // Wait a little more to make sure extra messages are not received.
+        Thread.sleep(1000);
+        
         assertIsSatisfied(mockEndpointA);
 	}
-
 }

Added: activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml?view=auto&rev=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml (added)
+++ activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml Wed Apr 25 10:20:28 2007
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: xbean -->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker useJmx="false"  xmlns="http://activemq.org/config/1.0" persistent="false" brokerName="localhost">
+      
+  </broker>
+  
+</beans>
+<!-- END SNIPPET: xbean -->

Modified: activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml (original)
+++ activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml Wed Apr 25 10:20:28 2007
@@ -25,8 +25,12 @@
     <property name="connectionFactory" ref="jmsConnectionFactory"/>
   </bean>
 
-  <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
-    <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
+  <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
+    <property name="brokerURL" value="vm://localhost"/>
+  </bean>
+
+  <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
+    <property name="config" value="org/apache/camel/component/jms/activemq.xml"/>
   </bean>
 
 </beans>

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Wed Apr 25 10:20:28 2007
@@ -18,10 +18,19 @@
 package org.apache.camel.processor.idempotent.jpa;
 
 import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.springframework.orm.jpa.JpaCallback;
 import org.springframework.orm.jpa.JpaTemplate;
+import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
 
+import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
+import javax.persistence.PersistenceException;
+
 import java.util.List;
 
 /**
@@ -29,8 +38,9 @@
  */
 public class JpaMessageIdRepository implements MessageIdRepository {
     protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2";
-    private JpaTemplate template;
+    private JpaTemplate jpaTemplate;
     private String processorName;
+	private TransactionTemplate transactionTemplate;
 
     public static JpaMessageIdRepository jpaMessageIdRepository(String persistenceUnit, String processorName) {
         EntityManagerFactory entityManagerFactory = Persistence.createEntityManagerFactory(persistenceUnit);
@@ -42,22 +52,41 @@
     }
 
     public JpaMessageIdRepository(JpaTemplate template, String processorName) {
-        this.template = template;
+        this(template, createTransactionTemplate(template), processorName);
+    }
+
+    public JpaMessageIdRepository(JpaTemplate template, TransactionTemplate transactionTemplate, String processorName) {
+        this.jpaTemplate = template;
         this.processorName = processorName;
+        this.transactionTemplate=transactionTemplate;
+    }
+    
+    static private TransactionTemplate createTransactionTemplate(JpaTemplate jpaTemplate) {
+    	TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        return transactionTemplate;
     }
 
-    public boolean contains(String messageId) {
-        List list = template.find(QUERY_STRING, processorName, messageId);
-        if (list.isEmpty()) {
-            MessageProcessed processed = new MessageProcessed();
-            processed.setProcessorName(processorName);
-            processed.setMessageId(messageId);
-            template.persist(processed);
-            template.flush();
-            return false;
-        }
-        else {
-            return true;
-        }
+    public boolean contains(final String messageId) {
+    	// Run this in single transaction.
+    	Boolean rc = (Boolean) transactionTemplate.execute(new TransactionCallback(){
+			public Object doInTransaction(TransactionStatus arg0) {
+				
+		        List list = jpaTemplate.find(QUERY_STRING, processorName, messageId);
+		        if (list.isEmpty()) {
+		            MessageProcessed processed = new MessageProcessed();
+		            processed.setProcessorName(processorName);
+		            processed.setMessageId(messageId);
+		            jpaTemplate.persist(processed);
+		            jpaTemplate.flush();
+		            return Boolean.FALSE;
+		        }
+		        else {
+		            return Boolean.TRUE;
+		        }
+			}
+		});
+    	return rc.booleanValue();
     }
 }

Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java Wed Apr 25 10:20:28 2007
@@ -20,9 +20,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.spring.spi.SpringTransactionPolicy;
 import org.springframework.context.ApplicationContext;
-import org.springframework.transaction.support.TransactionTemplate;
 
 /**
  * An extension of the {@link RouteBuilder} to provide some additional helper methods
@@ -31,20 +29,6 @@
  */
 public abstract class SpringRouteBuilder<E extends Exchange> extends RouteBuilder<E> {
     private ApplicationContext applicationContext;
-
-    /**
-     * Configures a transaction interceptor on routes created by this builder using the named spring bean
-     * for the {@link TransactionTemplate} to use for the transaction
-     *
-     * @param transactionTemplateName the name of the spring bean in the application context which is the
-     *                                {@link TransactionTemplate} to use
-     * @return this builder
-     */
-    public SpringRouteBuilder<E> transactionPolicy(String transactionTemplateName) {
-        TransactionTemplate template = bean(TransactionTemplate.class, transactionTemplateName);
-        setTransactionPolicy(new SpringTransactionPolicy(template));
-        return this;
-    }
 
     /**
      * Looks up the bean with the given name in the application context and returns it, or throws an exception if the

Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java Wed Apr 25 10:20:28 2007
@@ -22,6 +22,7 @@
 import org.apache.camel.spi.Policy;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
 import org.springframework.transaction.support.TransactionCallbackWithoutResult;
 import org.springframework.transaction.support.TransactionTemplate;
@@ -62,8 +63,28 @@
 
             @Override
             public String toString() {
-                return "SpringTransactionPolicy[" + getNext() + "]";
+                return "SpringTransactionPolicy:"+propagationBehaviorToString(transactionTemplate.getPropagationBehavior())+"[" + getNext() + "]";
             }
+
+			private String propagationBehaviorToString(int propagationBehavior) {
+				switch( propagationBehavior ) {
+				case TransactionDefinition.PROPAGATION_MANDATORY:
+					return "PROPAGATION_MANDATORY";
+				case TransactionDefinition.PROPAGATION_NESTED:
+					return "PROPAGATION_NESTED";
+				case TransactionDefinition.PROPAGATION_NEVER:
+					return "PROPAGATION_NEVER";
+				case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
+					return "PROPAGATION_NOT_SUPPORTED";
+				case TransactionDefinition.PROPAGATION_REQUIRED:
+					return "PROPAGATION_REQUIRED";
+				case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
+					return "PROPAGATION_REQUIRES_NEW";
+				case TransactionDefinition.PROPAGATION_SUPPORTS:
+					return "PROPAGATION_SUPPORTS";
+				}
+				return "UNKOWN";
+			}
         };
     }