You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/26 17:13:33 UTC
[3/6] camel git commit: Refresh entity manager on PersistenceException
Refresh entity manager on PersistenceException
The entity manager needs to be refreshed on all PersistenceExceptions
to ensure connections are being taken/returned from a connection pooled
data source. This is especially important in cases where connection to
the database is temporarily broken.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/62988ca4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/62988ca4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/62988ca4
Branch: refs/heads/camel-2.18.x
Commit: 62988ca426992bd72e8b94aea7274aaf5d152bda
Parents: 40da970
Author: James Thomas <ji...@gmail.com>
Authored: Tue Apr 25 06:23:55 2017 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 26 19:08:22 2017 +0200
----------------------------------------------------------------------
.../apache/camel/component/jpa/JpaConsumer.java | 102 ++++++++++---------
1 file changed, 53 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/62988ca4/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index f85d650..100b9bb 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -97,64 +97,68 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Recreated EntityManager {} on {}", entityManager, this);
}
- Object messagePolled = transactionTemplate.execute(new TransactionCallback<Object>() {
- public Object doInTransaction(TransactionStatus status) {
- if (getEndpoint().isJoinTransaction()) {
- entityManager.joinTransaction();
- }
+ Object messagePolled = null;
+ try {
+ messagePolled = transactionTemplate.execute(new TransactionCallback<Object>() {
+ public Object doInTransaction(TransactionStatus status) {
+ if (getEndpoint().isJoinTransaction()) {
+ entityManager.joinTransaction();
+ }
- Queue<DataHolder> answer = new LinkedList<DataHolder>();
+ Queue<DataHolder> answer = new LinkedList<DataHolder>();
- Query query = getQueryFactory().createQuery(entityManager);
- configureParameters(query);
- LOG.trace("Created query {}", query);
+ Query query = getQueryFactory().createQuery(entityManager);
+ configureParameters(query);
+ LOG.trace("Created query {}", query);
- List<?> results = query.getResultList();
- LOG.trace("Got result list from query {}", results);
+ List<?> results = query.getResultList();
+ LOG.trace("Got result list from query {}", results);
- for (Object result : results) {
- DataHolder holder = new DataHolder();
- holder.manager = entityManager;
- holder.result = result;
- holder.exchange = createExchange(result, entityManager);
- answer.add(holder);
- }
+ for (Object result : results) {
+ DataHolder holder = new DataHolder();
+ holder.manager = entityManager;
+ holder.result = result;
+ holder.exchange = createExchange(result, entityManager);
+ answer.add(holder);
+ }
- PersistenceException cause = null;
- int messagePolled = 0;
- try {
- messagePolled = processBatch(CastUtils.cast(answer));
- } catch (Exception e) {
- if (e instanceof PersistenceException) {
- cause = (PersistenceException) e;
- } else {
- cause = new PersistenceException(e);
+ PersistenceException cause = null;
+ int messagePolled = 0;
+ try {
+ messagePolled = processBatch(CastUtils.cast(answer));
+ } catch (Exception e) {
+ if (e instanceof PersistenceException) {
+ cause = (PersistenceException) e;
+ } else {
+ cause = new PersistenceException(e);
+ }
}
- }
- if (cause != null) {
- if (!isTransacted()) {
- LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
- } else {
- // Potentially EntityManager could be in an inconsistent state after transaction rollback,
- // so disposing it to have it recreated in next poll. cf. Java Persistence API 3.3.2 Transaction Rollback
- LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
- entityManager.close();
- entityManager = null;
-
- // rollback all by throwning exception
- throw cause;
+ if (cause != null) {
+ if (!isTransacted()) {
+ LOG.warn("Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.", cause.getMessage(), cause);
+ } else {
+ // rollback all by throwning exception
+ throw cause;
+ }
}
- }
- // commit
- LOG.debug("Flushing EntityManager");
- entityManager.flush();
- // must clear after flush
- entityManager.clear();
- return messagePolled;
- }
- });
+ // commit
+ LOG.debug("Flushing EntityManager");
+ entityManager.flush();
+ // must clear after flush
+ entityManager.clear();
+ return messagePolled;
+ }
+ });
+ } catch (Exception e) {
+ // Potentially EntityManager could be in an inconsistent state after transaction rollback,
+ // so disposing it to have it recreated in next poll. cf. Java Persistence API 3.3.2 Transaction Rollback
+ LOG.info("Disposing EntityManager {} on {} due to coming transaction rollback", entityManager, this);
+ entityManager.close();
+ entityManager = null;
+ throw new PersistenceException(e);
+ }
return getEndpoint().getCamelContext().getTypeConverter().convertTo(int.class, messagePolled);
}