You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/26 17:13:32 UTC

[2/6] camel git commit: Refresh entity manager on PersistenceException

Refresh entity manager on PersistenceException

The entity manager needs to be refreshed on all PersistenceExceptions
to ensure connections are being taken/returned from a connection pooled
data source.  This is especially important in cases where connection to
the database is temporarily broken.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66346a98
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66346a98
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66346a98

Branch: refs/heads/master
Commit: 66346a98efd2d2ad51518833ecf3f131079e1936
Parents: 667c389
Author: James Thomas <ji...@gmail.com>
Authored: Tue Apr 25 06:23:55 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 26 19:06:20 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jpa/JpaConsumer.java | 102 ++++++++++---------
 1 file changed, 53 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66346a98/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 24b5b21..fd9cf3d 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -97,64 +97,68 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
             LOG.trace("Recreated EntityManager {} on {}", entityManager, this);
         }
 
-        Object messagePolled = transactionTemplate.execute(new TransactionCallback<Object>() {
-            public Object doInTransaction(TransactionStatus status) {
-                if (getEndpoint().isJoinTransaction()) {
-                    entityManager.joinTransaction();
-                }
+        Object messagePolled = null;
+        try {
+            messagePolled = transactionTemplate.execute(new TransactionCallback<Object>() {
+                public Object doInTransaction(TransactionStatus status) {
+                    if (getEndpoint().isJoinTransaction()) {
+                        entityManager.joinTransaction();
+                    }
 
-                Queue<DataHolder> answer = new LinkedList<DataHolder>();
+                    Queue<DataHolder> answer = new LinkedList<DataHolder>();
 
-                Query query = getQueryFactory().createQuery(entityManager);
-                configureParameters(query);
-                LOG.trace("Created query {}", query);
+                    Query query = getQueryFactory().createQuery(entityManager);
+                    configureParameters(query);
+                    LOG.trace("Created query {}", query);
 
-                List<?> results = query.getResultList();
-                LOG.trace("Got result list from query {}", results);
+                    List<?> results = query.getResultList();
+                    LOG.trace("Got result list from query {}", results);
 
-                for (Object result : results) {
-                    DataHolder holder = new DataHolder();
-                    holder.manager = entityManager;
-                    holder.result = result;
-                    holder.exchange = createExchange(result, entityManager);
-                    answer.add(holder);
-                }
+                    for (Object result : results) {
+                        DataHolder holder = new DataHolder();
+                        holder.manager = entityManager;
+                        holder.result = result;
+                        holder.exchange = createExchange(result, entityManager);
+                        answer.add(holder);
+                    }
 
-                PersistenceException cause = null;
-                int messagePolled = 0;
-                try {
-                    messagePolled = processBatch(CastUtils.cast(answer));
-                } catch (Exception e) {
-                    if (e instanceof PersistenceException) {
-                        cause = (PersistenceException) e;
-                    } else {
-                        cause = new PersistenceException(e);
+                    PersistenceException cause = null;
+                    int messagePolled = 0;
+                    try {
+                        messagePolled = processBatch(CastUtils.cast(answer));
+                    } catch (Exception e) {
+                        if (e instanceof PersistenceException) {
+                            cause = (PersistenceException) e;
+                        } else {
+                            cause = new PersistenceException(e);
+                        }
                     }
-                }
 
-                if (cause != null) {
-                    if (!isTransacted()) {
-                        LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
-                    } else {
-                        // Potentially EntityManager could be in an inconsistent state after transaction rollback,
-                        // so disposing it to have it recreated in next poll. cf. Java Persistence API 3.3.2 Transaction Rollback
-                        LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
-                        entityManager.close();
-                        entityManager = null;
-                        
-                        // rollback all by throwning exception
-                        throw cause;
+                    if (cause != null) {
+                        if (!isTransacted()) {
+                            LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
+                        } else {
+                            // rollback all by throwning exception
+                            throw cause;
+                        }
                     }
-                }
 
-                // commit
-                LOG.debug("Flushing EntityManager");
-                entityManager.flush();
-                // must clear after flush
-                entityManager.clear();
-                return messagePolled;
-            }
-        });
+                    // commit
+                    LOG.debug("Flushing EntityManager");
+                    entityManager.flush();
+                    // must clear after flush
+                    entityManager.clear();
+                    return messagePolled;
+                }
+            });
+        } catch (Exception e) {
+            // Potentially EntityManager could be in an inconsistent state after transaction rollback,
+            // so disposing it to have it recreated in next poll. cf. Java Persistence API 3.3.2 Transaction Rollback
+            LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
+            entityManager.close();
+            entityManager = null;
+            throw new PersistenceException(e);
+        }
 
         return getEndpoint().getCamelContext().getTypeConverter().convertTo(int.class, messagePolled);
     }