You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/04/25 19:20:29 UTC
svn commit: r532413 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/component/mock/
camel-jms/src/main/java/org/apache/camel/component/jms/
camel-jms/src/test/java/org/apache/came...
Author: chirino
Date: Wed Apr 25 10:20:28 2007
New Revision: 532413
URL: http://svn.apache.org/viewvc?view=rev&rev=532413
Log:
- Removed the transactionPolicy attribute from the RouteBuilder since this was not actually configuring the inbound transaction policy as first envisioned. This may come back in a different shape/form.
- JpaMessageIdRepository now starts it's own transaction if called from a non transaction context.
- MockEndpoint now has a assertWait() method that can be used to wait for messages to arive before assertions are made against those messages.
- Added some more tests and imporved that TransactedJmsRouteTest
- The JmsConfiguration now uses a CacheLevel of CACHE_CONSUMER by default since this most efficient way to recieve messages from JMS.
Added:
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Apr 25 10:20:28 2007
@@ -416,7 +416,7 @@
else {
processor = new CompositeProcessor<E>(answer);
}
- return wrapInTransactionInterceptor(processor);
+ return processor;
}
/**
@@ -435,13 +435,6 @@
*/
protected Processor<E> wrapInErrorHandler(Processor<E> processor) throws Exception {
return getErrorHandlerBuilder().createErrorHandler(processor);
- }
-
- /**
- * A strategy method which allows transaction interceptors to be applied to a processor
- */
- protected Processor<E> wrapInTransactionInterceptor(Processor<E> processor) throws Exception {
- return getBuilder().getTransactionPolicy().wrap(processor);
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Wed Apr 25 10:20:28 2007
@@ -19,21 +19,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.FactoryFinder;
-import org.apache.camel.util.NoFactoryAvailableException;
-import org.apache.camel.spi.Policy;
-import org.apache.camel.spi.Injector;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ReflectionInjector;
-import org.apache.camel.impl.NoPolicy;
/**
* A <a href="http://activemq.apache.org/camel/dsl.html">Java DSL</a>
@@ -45,7 +37,6 @@
private List<FromBuilder<E>> fromBuilders = new ArrayList<FromBuilder<E>>();
private AtomicBoolean initalized = new AtomicBoolean(false);
private List<Route<E>> routes = new ArrayList<Route<E>>();
- private Policy<E> transactionPolicy;
protected RouteBuilder() {
this(null);
@@ -94,17 +85,6 @@
return this;
}
- /**
- * Specifies the transaction interceptor to be used for routes created from this builder
- *
- * @param interceptor the transaction interceptor to use
- * @return the current builder
- */
- public RouteBuilder<E> transactionPolicy(Policy<E> interceptor) {
- setTransactionPolicy(interceptor);
- return this;
- }
-
// Properties
//-----------------------------------------------------------------------
public CamelContext getContext() {
@@ -132,20 +112,6 @@
return fromBuilders;
}
- public Policy<E> getTransactionPolicy() throws Exception {
- if (transactionPolicy == null) {
- transactionPolicy = createTransactionPolicy();
- }
- return transactionPolicy;
- }
-
- /**
- * Sets the interceptor used wrap processors in a transaction
- */
- public void setTransactionPolicy(Policy<E> transactionInterceptor) {
- this.transactionPolicy = transactionInterceptor;
- }
-
// Implementation methods
//-----------------------------------------------------------------------
protected void checkInitialized() throws Exception {
@@ -183,20 +149,6 @@
*/
protected CamelContext createContainer() {
return new DefaultCamelContext();
- }
-
- /**
- * Factory method
- */
- protected Policy<E> createTransactionPolicy() throws Exception {
- FactoryFinder finder = new FactoryFinder();
- try {
- return (Policy<E>) finder.newInstance("TransactionPolicy", getContext().getInjector());
- }
- catch (NoFactoryAvailableException e) {
- // lets use the default
- return new NoPolicy<E>();
- }
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Wed Apr 25 10:20:28 2007
@@ -17,6 +17,13 @@
*/
package org.apache.camel.component.mock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
@@ -29,13 +36,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* A Mock endpoint which provides a literate, fluent API for testing routes using
* a <a href="http://jmock.org/">JMock style</a> API.
@@ -53,6 +53,26 @@
private long sleepForEmptyTest = 0L;
private int expectedMinimumCount=-1;
+ public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
+ long start = System.currentTimeMillis();
+ long left = unit.toMillis(timeout);
+ long end = start + left;
+ for (MockEndpoint endpoint : endpoints) {
+ if( !endpoint.await(left, TimeUnit.MILLISECONDS) )
+ throw new AssertionError("Timeout waiting for endpoints to receive enough messages. "+endpoint.getEndpointUri()+" timed out.");
+ left = end - System.currentTimeMillis();
+ if( left <= 0 )
+ left = 0;
+ }
+ }
+
+ public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
+ assertWait(timeout, unit, endpoints);
+ for (MockEndpoint endpoint : endpoints) {
+ endpoint.assertIsSatisfied();
+ }
+ }
+
public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
for (MockEndpoint endpoint : endpoints) {
endpoint.assertIsSatisfied();
@@ -94,7 +114,7 @@
public void assertIsSatisfied() throws InterruptedException {
assertIsSatisfied(sleepForEmptyTest);
}
-
+
/**
* Validates that all the available expectations on this endpoint are satisfied; or throw an exception
*/
@@ -321,5 +341,18 @@
public int getExpectedMinimumCount() {
return expectedMinimumCount;
+ }
+
+ public void await() throws InterruptedException {
+ if( latch!=null ) {
+ latch.await();
+ }
+ }
+
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ if( latch!=null ) {
+ return latch.await(timeout, unit);
+ }
+ return true;
}
}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Wed Apr 25 10:20:28 2007
@@ -174,12 +174,16 @@
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
+
if (cacheLevel >= 0) {
listenerContainer.setCacheLevel(cacheLevel);
- }
- if (cacheName != null) {
+ } else if (cacheName != null) {
listenerContainer.setCacheLevelName(cacheName);
+ } else {
+ // Default to CACHE_CONSUMER unless specified. This works best with most JMS providers.
+ listenerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
}
+
if (idleTaskExecutionLimit >= 0) {
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Apr 25 10:20:28 2007
@@ -131,4 +131,5 @@
public void setSelector(String selector) {
this.selector = selector;
}
+
}
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/TransactedJmsRouteTest.java Wed Apr 25 10:20:28 2007
@@ -18,13 +18,18 @@
package org.apache.camel.component.jms;
import static org.apache.camel.component.mock.MockEndpoint.assertIsSatisfied;
+import static org.apache.camel.component.mock.MockEndpoint.assertWait;
+
+import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.builder.ProcessorFactory;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.DelegateProcessor;
@@ -57,26 +62,68 @@
Policy notsupported = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_NOT_SUPPORTED"));
Policy requirenew = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRES_NEW"));
- DelegateProcessor rollback = new DelegateProcessor() {
- @Override
- public void process(Object exchange) {
- processNext(exchange);
- throw new RuntimeException("rollback");
- }
+ Policy rollback = new Policy() {
+ public Processor wrap(Processor processor) {
+ return new DelegateProcessor(processor) {
+ @Override
+ public void process(Object exchange) {
+ processNext(exchange);
+ throw new RuntimeException("rollback");
+ }
+
+ @Override
+ public String toString() {
+ return "rollback(" + next + ")";
+ }
+ };
+ }
+ };
+
+ Policy catchRollback = new Policy() {
+ public Processor wrap(Processor processor) {
+ return new DelegateProcessor(processor) {
+ @Override
+ public void process(Object exchange) {
+ try {
+ processNext(exchange);
+ } catch ( Throwable e ) {
+ }
+ }
+ @Override
+ public String toString() {
+ return "catchRollback(" + next + ")";
+ }
+ };
+ }
};
-
- // Used to verify that transacted sends will succeed.
- from("activemq:queue:mock.a").trace().to("mock:a"); // Used to validate messages are sent to the target.
+
+ // NOTE: ErrorHandler has to be disabled since it operates within the failed transaction.
+ inheritErrorHandler(false);
+ // Used to validate messages are sent to the target.
+ from("activemq:queue:mock.a").trace().to("mock:a");
// Receive from a and send to target in 1 tx.
- transactionPolicy("PROPAGATION_REQUIRED");
- from("activemq:queue:a").trace().to("activemq:queue:mock.a");
+ from("activemq:queue:a").to("activemq:queue:mock.a");
// Cause an error after processing the send. The send to activemq:queue:mock.a should rollback
// since it is participating in the inbound transaction, but mock:b does not participate so we should see the message get
// there. Also, expect multiple inbound retries as the message is rolled back.
- from("activemq:queue:b").inheritErrorHandler(false).trace().intercept(rollback).to("activemq:queue:mock.a", "mock:b");
+ //transactionPolicy(requried);
+ from("activemq:queue:b").policy(rollback).to("activemq:queue:mock.a", "mock:b");
+
+ // Cause an error after processing the send in a new transaction. The send to activemq:queue:mock.a should rollback
+ // since the rollback is within it's transaction, but mock:b does not participate so we should see the message get
+ // there. Also, expect the message to be successfully consumed since the rollback error is not propagated.
+ //transactionPolicy(requried);
+ from("activemq:queue:c").policy(catchRollback).policy(requirenew).policy(rollback).to("activemq:queue:mock.a", "mock:b");
+
+ // Cause an error after processing the send in without a transaction. The send to activemq:queue:mock.a should succeed.
+ // Also, expect the message to be successfully consumed since the rollback error is not propagated.
+ from("activemq:queue:d").policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a");
+// JmsEndpoint endpoint = (JmsEndpoint)endpoint("activemq:queue:e");
+// from(endpoint).policy(catchRollback).policy(notsupported).policy(rollback).to("activemq:queue:mock.a");
+
}
};
}
@@ -95,40 +142,62 @@
@Override
protected void setUp() throws Exception {
super.setUp();
+
+ for (Route route : this.context.getRoutes()) {
+ System.out.println(route);
+ }
+
mockEndpointA = (MockEndpoint) resolveMandatoryEndpoint("mock:a");
mockEndpointB = (MockEndpoint) resolveMandatoryEndpoint("mock:b");
}
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ spring.destroy();
+ }
- public void testReuqiredSend() throws Exception {
+ public void testSenarioA() throws Exception {
String expected = getName()+": "+System.currentTimeMillis();
mockEndpointA.expectedBodiesReceived(expected);
send("activemq:queue:a", expected);
assertIsSatisfied(mockEndpointA);
}
- public void testRequiredSendAndRollback() throws Exception {
+ public void testSenarioB() throws Exception {
String expected = getName()+": "+System.currentTimeMillis();
- mockEndpointA.expectedMessageCount(0);
- mockEndpointB.expectedMinimumMessageCount(5); // May be more since spring seems to go into tight loop redelivering.
+ mockEndpointA.expectedMessageCount(0);
+ mockEndpointB.expectedMinimumMessageCount(2); // May be more since spring seems to go into tight loop re-delivering.
send("activemq:queue:b", expected);
+ assertIsSatisfied(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+ }
+
+ public void testSenarioC() throws Exception {
+ String expected = getName()+": "+System.currentTimeMillis();
+ mockEndpointA.expectedMessageCount(0);
+ mockEndpointB.expectedMessageCount(1); // Should only get 1 message the incoming transaction does not rollback.
+ send("activemq:queue:c", expected);
+
+ // Wait till the endpoints get their messages.
+ assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+
+ // Wait a little more to make sure extra messages are not received.
+ Thread.sleep(1000);
+
assertIsSatisfied(mockEndpointA,mockEndpointB);
- int t = mockEndpointB.getReceivedCounter();
- System.out.println("Actual Deliveries: "+t);
}
- /**
- * Validates that the send was done in a new transaction. Message should be consumed from A,
- * But
- *
- * @throws Exception
- */
- public void xtestSendRequireNewAndRollack() throws Exception {
+ public void testSenarioD() throws Exception {
String expected = getName()+": "+System.currentTimeMillis();
- mockEndpointA.expectedMessageCount(0);
+ mockEndpointA.expectedMessageCount(1);
+ send("activemq:queue:d", expected);
- send("activemq:queue:a", expected);
+ // Wait till the endpoints get their messages.
+ assertWait(5, TimeUnit.SECONDS, mockEndpointA,mockEndpointB);
+ // Wait a little more to make sure extra messages are not received.
+ Thread.sleep(1000);
+
assertIsSatisfied(mockEndpointA);
}
-
}
Added: activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml?view=auto&rev=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml (added)
+++ activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/activemq.xml Wed Apr 25 10:20:28 2007
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- START SNIPPET: xbean -->
+<beans>
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker useJmx="false" xmlns="http://activemq.org/config/1.0" persistent="false" brokerName="localhost">
+
+ </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->
Modified: activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml (original)
+++ activemq/camel/trunk/camel-jms/src/test/resources/org/apache/camel/component/jms/spring.xml Wed Apr 25 10:20:28 2007
@@ -25,8 +25,12 @@
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
- <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/>
+ <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
+ <property name="brokerURL" value="vm://localhost"/>
+ </bean>
+
+ <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
+ <property name="config" value="org/apache/camel/component/jms/activemq.xml"/>
</bean>
</beans>
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java Wed Apr 25 10:20:28 2007
@@ -18,10 +18,19 @@
package org.apache.camel.processor.idempotent.jpa;
import org.apache.camel.processor.idempotent.MessageIdRepository;
+import org.springframework.orm.jpa.JpaCallback;
import org.springframework.orm.jpa.JpaTemplate;
+import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
+import javax.persistence.PersistenceException;
+
import java.util.List;
/**
@@ -29,8 +38,9 @@
*/
public class JpaMessageIdRepository implements MessageIdRepository {
protected static final String QUERY_STRING = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1 and x.messageId = ?2";
- private JpaTemplate template;
+ private JpaTemplate jpaTemplate;
private String processorName;
+ private TransactionTemplate transactionTemplate;
public static JpaMessageIdRepository jpaMessageIdRepository(String persistenceUnit, String processorName) {
EntityManagerFactory entityManagerFactory = Persistence.createEntityManagerFactory(persistenceUnit);
@@ -42,22 +52,41 @@
}
public JpaMessageIdRepository(JpaTemplate template, String processorName) {
- this.template = template;
+ this(template, createTransactionTemplate(template), processorName);
+ }
+
+ public JpaMessageIdRepository(JpaTemplate template, TransactionTemplate transactionTemplate, String processorName) {
+ this.jpaTemplate = template;
this.processorName = processorName;
+ this.transactionTemplate=transactionTemplate;
+ }
+
+ static private TransactionTemplate createTransactionTemplate(JpaTemplate jpaTemplate) {
+ TransactionTemplate transactionTemplate = new TransactionTemplate();
+ transactionTemplate.setTransactionManager(new JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+ transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+ return transactionTemplate;
}
- public boolean contains(String messageId) {
- List list = template.find(QUERY_STRING, processorName, messageId);
- if (list.isEmpty()) {
- MessageProcessed processed = new MessageProcessed();
- processed.setProcessorName(processorName);
- processed.setMessageId(messageId);
- template.persist(processed);
- template.flush();
- return false;
- }
- else {
- return true;
- }
+ public boolean contains(final String messageId) {
+ // Run this in single transaction.
+ Boolean rc = (Boolean) transactionTemplate.execute(new TransactionCallback(){
+ public Object doInTransaction(TransactionStatus arg0) {
+
+ List list = jpaTemplate.find(QUERY_STRING, processorName, messageId);
+ if (list.isEmpty()) {
+ MessageProcessed processed = new MessageProcessed();
+ processed.setProcessorName(processorName);
+ processed.setMessageId(messageId);
+ jpaTemplate.persist(processed);
+ jpaTemplate.flush();
+ return Boolean.FALSE;
+ }
+ else {
+ return Boolean.TRUE;
+ }
+ }
+ });
+ return rc.booleanValue();
}
}
Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/SpringRouteBuilder.java Wed Apr 25 10:20:28 2007
@@ -20,9 +20,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.spring.spi.SpringTransactionPolicy;
import org.springframework.context.ApplicationContext;
-import org.springframework.transaction.support.TransactionTemplate;
/**
* An extension of the {@link RouteBuilder} to provide some additional helper methods
@@ -31,20 +29,6 @@
*/
public abstract class SpringRouteBuilder<E extends Exchange> extends RouteBuilder<E> {
private ApplicationContext applicationContext;
-
- /**
- * Configures a transaction interceptor on routes created by this builder using the named spring bean
- * for the {@link TransactionTemplate} to use for the transaction
- *
- * @param transactionTemplateName the name of the spring bean in the application context which is the
- * {@link TransactionTemplate} to use
- * @return this builder
- */
- public SpringRouteBuilder<E> transactionPolicy(String transactionTemplateName) {
- TransactionTemplate template = bean(TransactionTemplate.class, transactionTemplateName);
- setTransactionPolicy(new SpringTransactionPolicy(template));
- return this;
- }
/**
* Looks up the bean with the given name in the application context and returns it, or throws an exception if the
Modified: activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java?view=diff&rev=532413&r1=532412&r2=532413
==============================================================================
--- activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java (original)
+++ activemq/camel/trunk/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringTransactionPolicy.java Wed Apr 25 10:20:28 2007
@@ -22,6 +22,7 @@
import org.apache.camel.spi.Policy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
@@ -62,8 +63,28 @@
@Override
public String toString() {
- return "SpringTransactionPolicy[" + getNext() + "]";
+ return "SpringTransactionPolicy:"+propagationBehaviorToString(transactionTemplate.getPropagationBehavior())+"[" + getNext() + "]";
}
+
+ private String propagationBehaviorToString(int propagationBehavior) {
+ switch( propagationBehavior ) {
+ case TransactionDefinition.PROPAGATION_MANDATORY:
+ return "PROPAGATION_MANDATORY";
+ case TransactionDefinition.PROPAGATION_NESTED:
+ return "PROPAGATION_NESTED";
+ case TransactionDefinition.PROPAGATION_NEVER:
+ return "PROPAGATION_NEVER";
+ case TransactionDefinition.PROPAGATION_NOT_SUPPORTED:
+ return "PROPAGATION_NOT_SUPPORTED";
+ case TransactionDefinition.PROPAGATION_REQUIRED:
+ return "PROPAGATION_REQUIRED";
+ case TransactionDefinition.PROPAGATION_REQUIRES_NEW:
+ return "PROPAGATION_REQUIRES_NEW";
+ case TransactionDefinition.PROPAGATION_SUPPORTS:
+ return "PROPAGATION_SUPPORTS";
+ }
+ return "UNKOWN";
+ }
};
}