You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 17:44:08 UTC

svn commit: r777944 - in /camel/trunk/components/camel-jpa/src: main/java/org/apache/camel/component/jpa/JpaConsumer.java test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java

Author: davsclaus
Date: Sat May 23 15:44:07 2009
New Revision: 777944

URL: http://svn.apache.org/viewvc?rev=777944&view=rev
Log:
CAMEL-1640: jpa consumer is now also BatchConsumer.

Added:
    camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java   (with props)
Modified:
    camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java

Modified: camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?rev=777944&r1=777943&r2=777944&view=diff
==============================================================================
--- camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ camel/trunk/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Sat May 23 15:44:07 2009
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jpa;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 
 import javax.persistence.EntityManager;
@@ -24,6 +25,7 @@
 import javax.persistence.PersistenceException;
 import javax.persistence.Query;
 
+import org.apache.camel.BatchConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollConsumer;
@@ -36,7 +38,8 @@
 /**
  * @version $Revision$
  */
-public class JpaConsumer extends ScheduledPollConsumer {
+public class JpaConsumer extends ScheduledPollConsumer implements BatchConsumer {
+
     private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class);
     private final JpaEndpoint endpoint;
     private final TransactionStrategy template;
@@ -46,6 +49,12 @@
     private String namedQuery;
     private String nativeQuery;
 
+    private class DataHolder {
+        private Exchange exchange;
+        private Object result;
+        private EntityManager manager;
+    }
+
     public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -55,32 +64,68 @@
     protected void poll() throws Exception {
         template.execute(new JpaCallback() {
             public Object doInJpa(EntityManager entityManager) throws PersistenceException {
+                List<DataHolder> answer = new ArrayList<DataHolder>();
+
                 Query query = getQueryFactory().createQuery(entityManager);
                 configureParameters(query);
                 List results = query.getResultList();
                 for (Object result : results) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Processing new entity: " + result);
-                    }
+                    DataHolder holder = new DataHolder();
+                    holder.manager = entityManager;
+                    holder.result = result;
+                    holder.exchange = createExchange(result);
+                    answer.add(holder);
+                }
 
-                    if (lockEntity(result, entityManager)) {
-                        // lets turn the result into an exchange and fire it
-                        // into the processor
-                        Exchange exchange = createExchange(result);
-                        try {
-                            getProcessor().process(exchange);
-                        } catch (Exception e) {
-                            throw new PersistenceException(e);
-                        }
-                        getDeleteHandler().deleteObject(entityManager, result);
-                    }
+                try {
+                    processBatch(answer);
+                } catch (Exception e) {
+                    throw new PersistenceException(e);
                 }
+
                 entityManager.flush();
                 return null;
             }
         });
     }
 
+    public void processBatch(List exchanges) throws Exception {
+        final List<DataHolder> list = exchanges;
+        if (list.isEmpty()) {
+            return;
+        }
+
+        EntityManager entityManager = list.get(0).manager;
+        int total = list.size();
+
+        for (int index = 0; index < total && isRunAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            Exchange exchange = list.get(index).exchange;
+            Object result = list.get(index).result;
+
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            if (lockEntity(result, entityManager)) {
+
+                // process the current exchange
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Processing exchange: " + exchange);
+                }
+                try {
+                    getProcessor().process(exchange);
+                } catch (Exception e) {
+                    throw new PersistenceException(e);
+                }
+
+                getDeleteHandler().deleteObject(entityManager, result);
+            }
+        }
+    }
+
+
     // Properties
     // -------------------------------------------------------------------------
     public JpaEndpoint getEndpoint() {

Added: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java?rev=777944&view=auto
==============================================================================
--- camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java (added)
+++ camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java Sat May 23 15:44:07 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.jpa;
+
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.examples.SendEmail;
+import org.apache.camel.spring.SpringCamelContext;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.orm.jpa.JpaTemplate;
+import org.springframework.orm.jpa.JpaTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * @version $Revision$
+ */
+public class JpaBatchConsumerTest extends ContextTestSupport {
+
+    protected static final String SELECT_ALL_STRING = "select x from " + SendEmail.class.getName() + " x";
+
+    protected ApplicationContext applicationContext;
+    protected JpaTemplate jpaTemplate;
+
+    public void testBatchConsumer() throws Exception {
+        // first create two records
+        template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("foo@beer.org"));
+        template.sendBody("jpa://" + SendEmail.class.getName(), new SendEmail("bar@beer.org"));
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("jpa://" + SendEmail.class.getName() + "?delay=2000").to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/processor/jpa/springJpaRouteTest.xml");
+        cleanupRepository();
+        return SpringCamelContext.springCamelContext(applicationContext);
+    }
+
+    protected void cleanupRepository() {
+        jpaTemplate = (JpaTemplate)applicationContext.getBean("jpaTemplate", JpaTemplate.class);
+
+        TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new JpaTransactionManager(jpaTemplate.getEntityManagerFactory()));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+
+        transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus arg0) {
+                List list = jpaTemplate.find(SELECT_ALL_STRING);
+                for (Object item : list) {
+                    jpaTemplate.remove(item);
+                }
+                jpaTemplate.flush();
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+
+}

Propchange: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaBatchConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date