You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 17:44:08 UTC
svn commit: r777944 - in /camel/trunk/components/camel-jpa/src:
main/java/org/apache/camel/component/jpa/JpaConsumer.java
test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
Author: davsclaus
Date: Sat May 23 15:44:07 2009
New Revision: 777944
URL: http://svn.apache.org/viewvc?rev=777944&view=rev
Log:
CAMEL-1640: jpa consumer is now also BatchConsumer.
Added:
camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java (with props)
Modified:
camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=777944&r1=777943&r2=777944&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat May 23 15:44:07 2009
@@ -17,6 +17,7 @@
package org.apache.camel.component.jpa;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.List;
import javax.persistence.EntityManager;
@@ -24,6 +25,7 @@
import javax.persistence.PersistenceException;
import javax.persistence.Query;
+import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
@@ -36,7 +38,8 @@
/**
* @version $Revision$
*/
-public class JpaConsumer extends ScheduledPollConsumer {
+public class JpaConsumer extends ScheduledPollConsumer implements BatchConsumer {
+
private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class);
private final JpaEndpoint endpoint;
private final TransactionStrategy template;
@@ -46,6 +49,12 @@
private String namedQuery;
private String nativeQuery;
+ private class DataHolder {
+ private Exchange exchange;
+ private Object result;
+ private EntityManager manager;
+ }
+
public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
@@ -55,32 +64,68 @@
protected void poll() throws Exception {
template.execute(new JpaCallback() {
public Object doInJpa(EntityManager entityManager) throws PersistenceException {
+ List<DataHolder> answer = new ArrayList<DataHolder>();
+
Query query = getQueryFactory().createQuery(entityManager);
configureParameters(query);
List results = query.getResultList();
for (Object result : results) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing new entity: " + result);
- }
+ DataHolder holder = new DataHolder();
+ holder.manager = entityManager;
+ holder.result = result;
+ holder.exchange = createExchange(result);
+ answer.add(holder);
+ }
- if (lockEntity(result, entityManager)) {
- // lets turn the result into an exchange and fire it
- // into the processor
- Exchange exchange = createExchange(result);
- try {
- getProcessor().process(exchange);
- } catch (Exception e) {
- throw new PersistenceException(e);
- }
- getDeleteHandler().deleteObject(entityManager, result);
- }
+ try {
+ processBatch(answer);
+ } catch (Exception e) {
+ throw new PersistenceException(e);
}
+
entityManager.flush();
return null;
}
});
}
+ public void processBatch(List exchanges) throws Exception {
+ final List<DataHolder> list = exchanges;
+ if (list.isEmpty()) {
+ return;
+ }
+
+ EntityManager entityManager = list.get(0).manager;
+ int total = list.size();
+
+ for (int index = 0; index < total && isRunAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ Exchange exchange = list.get(index).exchange;
+ Object result = list.get(index).result;
+
+ // add current index and total as properties
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+ if (lockEntity(result, entityManager)) {
+
+ // process the current exchange
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing exchange: " + exchange);
+ }
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ throw new PersistenceException(e);
+ }
+
+ getDeleteHandler().deleteObject(entityManager, result);
+ }
+ }
+ }
+
+
// Properties
// -------------------------------------------------------------------------
public JpaEndpoint getEndpoint() {
Added: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java?rev=777944&view=auto
==============================================================================
--- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java (added)
+++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java Sat May 23 15:44:07 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.examples.SendEmail;
+import org.apache.camel.spring.SpringCamelContext;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.orm.jpa.JpaTemplate;
+import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class JpaBatchConsumerTest extends ContextTestSupport {
+
+ protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+
+ protected ApplicationContext applicationContext;
+ protected JpaTemplate jpaTemplate;
+
+ public void testBatchConsumer() throws Exception {
+ // first create two records
+ template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("foo@beer.org"));
+ template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("bar@beer.org"));
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(2);
+ mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+ mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+ mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+ mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("jpa://" + SendEmail.class.getName() + "?delay=2000").to("mock:result");
+ }
+ };
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/processor/jpa/springJpaRouteTest.xml");
+ cleanupRepository();
+ return SpringCamelContext.springCamelContext(applicationContext);
+ }
+
+ protected void cleanupRepository() {
+ jpaTemplate = (JpaTemplate)applicationContext.getBean("jpaTemplate", JpaTemplate.class);
+
+ TransactionTemplate transactionTemplate = new TransactionTemplate();
+ transactionTemplate.setTransactionManager(new JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+ transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+
+ transactionTemplate.execute(new TransactionCallback() {
+ public Object doInTransaction(TransactionStatus arg0) {
+ List list = jpaTemplate.find(SELECT_ALL_STRING);
+ for (Object item : list) {
+ jpaTemplate.remove(item);
+ }
+ jpaTemplate.flush();
+ return Boolean.TRUE;
+ }
+ });
+ }
+
+
+}
Propchange: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date