You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/19 13:01:18 UTC

[camel] branch main updated: (chores) camel-jpa: fixing a few code violations from SonarCloud

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 94808cf96e6 (chores) camel-jpa: fixing a few code violations from SonarCloud
94808cf96e6 is described below

commit 94808cf96e657dd8b60109172e963ad4a4051259
Author: Gilvan Filho <gi...@gmail.com>
AuthorDate: Thu Oct 13 12:26:09 2022 -0300

    (chores) camel-jpa: fixing a few code violations from SonarCloud
---
 .../component/jpa/DefaultTransactionStrategy.java  |  20 +--
 .../apache/camel/component/jpa/DeleteHandler.java  |   2 +-
 .../apache/camel/component/jpa/JpaComponent.java   |  26 ++-
 .../apache/camel/component/jpa/JpaConsumer.java    | 171 +++++++++----------
 .../camel/component/jpa/JpaPollingConsumer.java    |  12 +-
 .../apache/camel/component/jpa/JpaProducer.java    |  78 ++++-----
 .../idempotent/jpa/JpaMessageIdRepository.java     | 181 ++++++++++-----------
 .../camel/component/jpa/JpaComponentTest.java      | 104 ++++++------
 .../camel/component/jpa/JpaEndpointTest.java       |  20 ++-
 .../org/apache/camel/component/jpa/JpaTest.java    |   2 +-
 .../jpa/JpaWithNamedQueryAndParametersTest.java    |   2 +-
 .../jpa/JpaRouteSharedEntityManagerTest.java       |   2 +-
 12 files changed, 304 insertions(+), 316 deletions(-)

diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DefaultTransactionStrategy.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DefaultTransactionStrategy.java
index 709d8fbc4e1..3e2be853794 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DefaultTransactionStrategy.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DefaultTransactionStrategy.java
@@ -21,18 +21,14 @@ import javax.persistence.EntityManagerFactory;
 import org.springframework.orm.jpa.JpaTransactionManager;
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
 public class DefaultTransactionStrategy implements TransactionStrategy {
     private final TransactionTemplate transactionTemplate;
     private final PlatformTransactionManager transactionManager;
-    private final EntityManagerFactory entityManagerFactory;
 
     public DefaultTransactionStrategy(PlatformTransactionManager transactionManager,
                                       EntityManagerFactory entityManagerFactory) {
-        this.entityManagerFactory = entityManagerFactory;
         if (transactionManager == null) {
             this.transactionManager = createTransactionManager(entityManagerFactory);
         } else {
@@ -43,11 +39,9 @@ public class DefaultTransactionStrategy implements TransactionStrategy {
 
     @Override
     public void executeInTransaction(Runnable runnable) {
-        transactionTemplate.execute(new TransactionCallback<Object>() {
-            public Object doInTransaction(TransactionStatus status) {
-                runnable.run();
-                return null;
-            }
+        transactionTemplate.execute(status -> {
+            runnable.run();
+            return null;
         });
     }
 
@@ -62,9 +56,9 @@ public class DefaultTransactionStrategy implements TransactionStrategy {
     }
 
     protected TransactionTemplate createTransactionTemplate() {
-        TransactionTemplate transactionTemplate = new TransactionTemplate(getTransactionManager());
-        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
-        transactionTemplate.afterPropertiesSet();
-        return transactionTemplate;
+        TransactionTemplate newTransactionTemplate = new TransactionTemplate(getTransactionManager());
+        newTransactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        newTransactionTemplate.afterPropertiesSet();
+        return newTransactionTemplate;
     }
 }
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java
index 112c60394c5..ac34e43b5e3 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/DeleteHandler.java
@@ -34,5 +34,5 @@ public interface DeleteHandler<T> {
      * @param entityBean    the entity bean that has been processed and should be deleted
      * @param exchange      the exchange that could be used to update the entityBean
      */
-    void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange);
+    void deleteObject(EntityManager entityManager, T entityBean, Exchange exchange);
 }
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
index 64453a3b1f9..29447f38515 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaComponent.java
@@ -57,6 +57,7 @@ public class JpaComponent extends DefaultComponent {
     private Map<String, Class<?>> aliases = new HashMap<>();
 
     public JpaComponent() {
+        // default constructor
     }
 
     // Properties
@@ -175,10 +176,7 @@ public class JpaComponent extends DefaultComponent {
         return endpoint;
     }
 
-    @Override
-    protected void doInit() throws Exception {
-        super.doInit();
-
+    private void initEntityManagerFactory() {
         // lookup entity manager factory and use it if only one provided
         if (entityManagerFactory == null) {
             Map<String, EntityManagerFactory> map
@@ -196,12 +194,9 @@ public class JpaComponent extends DefaultComponent {
         } else {
             LOG.info("Using EntityManagerFactory configured: {}", entityManagerFactory);
         }
+    }
 
-        if (transactionStrategy != null) {
-            LOG.info("Using TransactionStrategy configured: {}", transactionStrategy);
-            return;
-        }
-
+    private void initTransactionManager() {
         // lookup transaction manager and use it if only one provided
         if (transactionManager == null) {
             Map<String, PlatformTransactionManager> map
@@ -233,6 +228,19 @@ public class JpaComponent extends DefaultComponent {
                 }
             }
         }
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        initEntityManagerFactory();
+
+        if (transactionStrategy != null) {
+            LOG.info("Using TransactionStrategy configured: {}", transactionStrategy);
+            return;
+        }
+
+        initTransactionManager();
 
         // warn about missing configuration
         if (entityManagerFactory == null) {
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index da4acb0a84d..fa42063ff84 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -83,12 +83,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
         this.transactionStrategy = endpoint.getTransactionStrategy();
     }
 
-    @Override
-    protected int poll() throws Exception {
-        // must reset for each poll
-        shutdownRunningTask = null;
-        pendingExchanges = 0;
-
+    private void recreateEntityManagerIfNeeded() {
         // Recreate EntityManager in case it is disposed due to transaction rollback
         if (entityManager == null) {
             if (getEndpoint().isSharedEntityManager()) {
@@ -98,61 +93,65 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
             }
             LOG.trace("Recreated EntityManager {} on {}", entityManager, this);
         }
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        recreateEntityManagerIfNeeded();
 
         final int[] messagePolled = { 0 };
         try {
-            transactionStrategy.executeInTransaction(new Runnable() {
-                @Override
-                public void run() {
-                    if (getEndpoint().isJoinTransaction()) {
-                        entityManager.joinTransaction();
-                    }
+            transactionStrategy.executeInTransaction(() -> {
+                if (getEndpoint().isJoinTransaction()) {
+                    entityManager.joinTransaction();
+                }
 
-                    Queue<DataHolder> answer = new LinkedList<>();
+                Queue<DataHolder> answer = new LinkedList<>();
 
-                    Query query = getQueryFactory().createQuery(entityManager);
-                    configureParameters(query);
-                    LOG.trace("Created query {}", query);
+                Query toExecute = getQueryFactory().createQuery(entityManager);
+                configureParameters(toExecute);
+                LOG.trace("Created query {}", toExecute);
 
-                    List<?> results = query.getResultList();
-                    LOG.trace("Got result list from query {}", results);
+                List<?> results = toExecute.getResultList();
+                LOG.trace("Got result list from query {}", results);
 
-                    for (Object result : results) {
-                        DataHolder holder = new DataHolder();
-                        holder.manager = entityManager;
-                        holder.result = result;
-                        holder.exchange = createExchange(result, entityManager);
-                        answer.add(holder);
-                    }
+                for (Object result : results) {
+                    DataHolder holder = new DataHolder();
+                    holder.manager = entityManager;
+                    holder.result = result;
+                    holder.exchange = createExchange(result, entityManager);
+                    answer.add(holder);
+                }
 
-                    PersistenceException cause = null;
-                    try {
-                        messagePolled[0] = processBatch(CastUtils.cast(answer));
-                    } catch (Exception e) {
-                        if (e instanceof PersistenceException) {
-                            cause = (PersistenceException) e;
-                        } else {
-                            cause = new PersistenceException(e);
-                        }
-                    }
+                PersistenceException cause = null;
+                try {
+                    messagePolled[0] = processBatch(CastUtils.cast(answer));
+                } catch (PersistenceException e) {
+                    cause = e;
+                } catch (Exception e) {
+                    cause = new PersistenceException(e);
+                }
 
-                    if (cause != null) {
-                        if (!isTransacted()) {
-                            LOG.warn(
-                                    "Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.",
-                                    cause.getMessage(), cause);
-                        } else {
-                            // rollback all by throwing exception
-                            throw cause;
-                        }
+                if (cause != null) {
+                    if (!isTransacted()) {
+                        LOG.warn(
+                                "Error processing last message due: {}. Will commit all previous successful processed message, and ignore this last failure.",
+                                cause.getMessage(), cause);
+                    } else {
+                        // rollback all by throwing exception
+                        throw cause;
                     }
-
-                    // commit
-                    LOG.debug("Flushing EntityManager");
-                    entityManager.flush();
-                    // must clear after flush
-                    entityManager.clear();
                 }
+
+                // commit
+                LOG.debug("Flushing EntityManager");
+                entityManager.flush();
+                // must clear after flush
+                entityManager.clear();
             });
         } catch (Exception e) {
             // Potentially EntityManager could be in an inconsistent state after transaction rollback,
@@ -180,7 +179,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
         for (int index = 0; index < total && isBatchAllowed(); index++) {
             // only loop if we are started (allowed to run)
             DataHolder holder = org.apache.camel.util.ObjectHelper.cast(DataHolder.class, exchanges.poll());
-            EntityManager entityManager = holder.manager;
+            EntityManager batchEntityManager = holder.manager;
             Exchange exchange = holder.exchange;
             Object result = holder.result;
 
@@ -191,9 +190,9 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
 
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
-            if (lockEntity(result, entityManager)) {
+            if (lockEntity(result, batchEntityManager)) {
                 // Run the @PreConsumed callback
-                createPreDeleteHandler().deleteObject(entityManager, result, exchange);
+                createPreDeleteHandler().deleteObject(batchEntityManager, result, exchange);
 
                 // process the current exchange
                 LOG.debug("Processing exchange: {}", exchange);
@@ -209,7 +208,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
                         throw exchange.getException();
                     } else {
                         // Run the @Consumed callback
-                        getDeleteHandler().deleteObject(entityManager, result, exchange);
+                        getDeleteHandler().deleteObject(batchEntityManager, result, exchange);
                     }
                 } finally {
                     releaseExchange(exchange, false);
@@ -370,10 +369,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
             }
             //TODO: Find if possible an alternative way to handle results of native queries.
             //Result of native queries are Arrays and cannot be locked by all JPA Providers.
-            if (entity.getClass().isArray()) {
-                return true;
-            }
-            return false;
+            return entity.getClass().isArray();
         }
     }
 
@@ -430,16 +426,13 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
                 // Inspect the parameters of the @PreConsumed method
                 final Method method = methods.get(0);
                 final boolean useExchangeParameter = checkParameters(method);
-                return new DeleteHandler<Object>() {
-                    @Override
-                    public void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange) {
-                        // The entityBean could be an Object array
-                        if (entityType.isInstance(entityBean)) {
-                            if (useExchangeParameter) {
-                                ObjectHelper.invokeMethod(method, entityBean, exchange);
-                            } else {
-                                ObjectHelper.invokeMethod(method, entityBean);
-                            }
+                return (EntityManager em, Object entityBean, Exchange exchange) -> {
+                    // The entityBean could be an Object array
+                    if (entityType.isInstance(entityBean)) {
+                        if (useExchangeParameter) {
+                            ObjectHelper.invokeMethod(method, entityBean, exchange);
+                        } else {
+                            ObjectHelper.invokeMethod(method, entityBean);
                         }
                     }
                 };
@@ -447,11 +440,7 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
         }
 
         // else do nothing
-        return new DeleteHandler<Object>() {
-            @Override
-            public void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange) {
-                // Do nothing
-            }
+        return (EntityManager em, Object entityBean, Exchange exchange) -> {
         };
     }
 
@@ -466,41 +455,33 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
             } else if (methods.size() == 1) {
                 final Method method = methods.get(0);
                 final boolean useExchangeParameter = checkParameters(method);
-                return new DeleteHandler<Object>() {
-                    public void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange) {
-                        if (entityType.isInstance(entityBean)) {
-                            if (useExchangeParameter) {
-                                ObjectHelper.invokeMethod(method, entityBean, exchange);
-                            } else {
-                                ObjectHelper.invokeMethod(method, entityBean);
-                            }
+                return (EntityManager em, Object entityBean, Exchange exchange) -> {
+                    if (entityType.isInstance(entityBean)) {
+                        if (useExchangeParameter) {
+                            ObjectHelper.invokeMethod(method, entityBean, exchange);
+                        } else {
+                            ObjectHelper.invokeMethod(method, entityBean);
                         }
                     }
                 };
             }
         }
         if (getEndpoint().isConsumeDelete()) {
-            return new DeleteHandler<Object>() {
-                public void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange) {
-                    entityManager.remove(entityBean);
-                }
-            };
-        } else {
-            return new DeleteHandler<Object>() {
-                public void deleteObject(EntityManager entityManager, Object entityBean, Exchange exchange) {
-                    // do nothing
-                }
-            };
+            return (EntityManager em, Object entityBean, Exchange exchange) -> em.remove(entityBean);
         }
+
+        return (EntityManager em, Object entityBean, Exchange exchange) -> {
+        };
+
     }
 
     protected boolean checkParameters(Method method) {
         boolean result = false;
-        Class<?>[] parameters = method.getParameterTypes();
-        if (parameters.length == 1 && parameters[0].isAssignableFrom(Exchange.class)) {
+        Class<?>[] receivedParameters = method.getParameterTypes();
+        if (receivedParameters.length == 1 && receivedParameters[0].isAssignableFrom(Exchange.class)) {
             result = true;
         }
-        if (parameters.length > 0 && !result) {
+        if (receivedParameters.length > 0 && !result) {
             throw new IllegalStateException("@PreConsumed annotated method cannot have parameter other than Exchange");
         }
         return result;
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
index 538ed3a4973..784f6acb7a1 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaPollingConsumer.java
@@ -44,7 +44,7 @@ public class JpaPollingConsumer extends PollingConsumerSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(JpaPollingConsumer.class);
 
-    private transient ExecutorService executorService;
+    private volatile ExecutorService executorService;
     private final EntityManagerFactory entityManagerFactory;
     private final TransactionStrategy transactionStrategy;
     private String query;
@@ -137,19 +137,19 @@ public class JpaPollingConsumer extends PollingConsumerSupport {
                     entityManager.joinTransaction();
                 }
 
-                Query query = getQueryFactory().createQuery(entityManager);
-                configureParameters(query);
+                Query innerQuery = getQueryFactory().createQuery(entityManager);
+                configureParameters(innerQuery);
 
                 if (getEndpoint().isConsumeLockEntity()) {
-                    query.setLockMode(getLockModeType());
+                    innerQuery.setLockMode(getLockModeType());
                 }
 
-                LOG.trace("Created query {}", query);
+                LOG.trace("Created query {}", innerQuery);
 
                 Object answer;
 
                 try {
-                    List<?> results = query.getResultList();
+                    List<?> results = innerQuery.getResultList();
 
                     if (results != null && results.size() == 1) {
                         // we only have 1 entity so return that
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
index 3c510db70a7..44d6273e047 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaProducer.java
@@ -173,30 +173,27 @@ public class JpaProducer extends DefaultProducer {
     }
 
     protected void processQuery(Exchange exchange, EntityManager entityManager) {
-        Query query = getQueryFactory().createQuery(entityManager);
-        configureParameters(query, exchange);
+        Query innerQuery = getQueryFactory().createQuery(entityManager);
+        configureParameters(innerQuery, exchange);
 
-        transactionStrategy.executeInTransaction(new Runnable() {
-            @Override
-            public void run() {
-                if (getEndpoint().isJoinTransaction()) {
-                    entityManager.joinTransaction();
-                }
+        transactionStrategy.executeInTransaction(() -> {
+            if (getEndpoint().isJoinTransaction()) {
+                entityManager.joinTransaction();
+            }
 
-                Message target;
-                if (ExchangeHelper.isOutCapable(exchange)) {
-                    target = exchange.getOut();
-                    // preserve headers
-                    target.getHeaders().putAll(exchange.getIn().getHeaders());
-                } else {
-                    target = exchange.getIn();
-                }
-                Object answer = isUseExecuteUpdate() ? query.executeUpdate() : query.getResultList();
-                target.setBody(answer);
+            Message target;
+            if (ExchangeHelper.isOutCapable(exchange)) {
+                target = exchange.getMessage();
+                // preserve headers
+                target.getHeaders().putAll(exchange.getIn().getHeaders());
+            } else {
+                target = exchange.getIn();
+            }
+            Object answer = isUseExecuteUpdate() ? innerQuery.executeUpdate() : innerQuery.getResultList();
+            target.setBody(answer);
 
-                if (getEndpoint().isFlushOnSend()) {
-                    entityManager.flush();
-                }
+            if (getEndpoint().isFlushOnSend()) {
+                entityManager.flush();
             }
         });
     }
@@ -229,29 +226,26 @@ public class JpaProducer extends DefaultProducer {
         final Object key = exchange.getMessage().getBody();
 
         if (key != null) {
-            transactionStrategy.executeInTransaction(new Runnable() {
-                @Override
-                public void run() {
-                    if (getEndpoint().isJoinTransaction()) {
-                        entityManager.joinTransaction();
-                    }
+            transactionStrategy.executeInTransaction(() -> {
+                if (getEndpoint().isJoinTransaction()) {
+                    entityManager.joinTransaction();
+                }
 
-                    Object answer = entityManager.find(getEndpoint().getEntityType(), key);
-                    LOG.debug("Find: {} -> {}", key, answer);
+                Object answer = entityManager.find(getEndpoint().getEntityType(), key);
+                LOG.debug("Find: {} -> {}", key, answer);
 
-                    Message target;
-                    if (ExchangeHelper.isOutCapable(exchange)) {
-                        target = exchange.getOut();
-                        // preserve headers
-                        target.getHeaders().putAll(exchange.getIn().getHeaders());
-                    } else {
-                        target = exchange.getIn();
-                    }
-                    target.setBody(answer);
+                Message target;
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    target = exchange.getMessage();
+                    // preserve headers
+                    target.getHeaders().putAll(exchange.getIn().getHeaders());
+                } else {
+                    target = exchange.getIn();
+                }
+                target.setBody(answer);
 
-                    if (getEndpoint().isFlushOnSend()) {
-                        entityManager.flush();
-                    }
+                if (getEndpoint().isFlushOnSend()) {
+                    entityManager.flush();
                 }
             });
         }
@@ -292,7 +286,7 @@ public class JpaProducer extends DefaultProducer {
                         Collection<?> collection = (Collection<?>) values;
                         // need to create a list to store returned values as they can be updated
                         // by JPA such as setting auto assigned ids
-                        Collection managedCollection = new ArrayList<>(collection.size());
+                        Collection<Object> managedCollection = new ArrayList<>(collection.size());
                         Object managedEntity;
                         for (Object entity : collection) {
                             if (!getEndpoint().isRemove()) {
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
index f3e2f5b2d53..acf6c1bb22d 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/processor/idempotent/jpa/JpaMessageIdRepository.java
@@ -46,6 +46,9 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
     protected static final String QUERY_CLEAR_STRING
             = "select x from " + MessageProcessed.class.getName() + " x where x.processorName = ?1";
 
+    private static final String SOMETHING_WENT_WRONG
+            = "Something went wrong trying to add message to repository %s";
+
     private static final Logger LOG = LoggerFactory.getLogger(JpaMessageIdRepository.class);
 
     private final String processorName;
@@ -90,37 +93,35 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
                 = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager, true);
         // Run this in single transaction.
         final Boolean[] rc = new Boolean[1];
-        transactionStrategy.executeInTransaction(new Runnable() {
-            public void run() {
-                if (isJoinTransaction()) {
-                    entityManager.joinTransaction();
-                }
+        transactionStrategy.executeInTransaction(() -> {
+            if (isJoinTransaction()) {
+                entityManager.joinTransaction();
+            }
 
+            try {
+                List<?> list = query(entityManager, messageId);
+                if (list.isEmpty()) {
+                    MessageProcessed processed = new MessageProcessed();
+                    processed.setProcessorName(processorName);
+                    processed.setMessageId(messageId);
+                    processed.setCreatedAt(new Date());
+                    entityManager.persist(processed);
+                    entityManager.flush();
+                    entityManager.close();
+                    rc[0] = Boolean.TRUE;
+                } else {
+                    rc[0] = Boolean.FALSE;
+                }
+            } catch (Exception ex) {
+                String contextInfo = String.format(SOMETHING_WENT_WRONG, ex.getMessage());
+                throw new PersistenceException(contextInfo, ex);
+            } finally {
                 try {
-                    List<?> list = query(entityManager, messageId);
-                    if (list.isEmpty()) {
-                        MessageProcessed processed = new MessageProcessed();
-                        processed.setProcessorName(processorName);
-                        processed.setMessageId(messageId);
-                        processed.setCreatedAt(new Date());
-                        entityManager.persist(processed);
-                        entityManager.flush();
+                    if (entityManager.isOpen()) {
                         entityManager.close();
-                        rc[0] = Boolean.TRUE;
-                    } else {
-                        rc[0] = Boolean.FALSE;
-                    }
-                } catch (Exception ex) {
-                    LOG.error("Something went wrong trying to add message to repository {}", ex.getMessage(), ex);
-                    throw new PersistenceException(ex);
-                } finally {
-                    try {
-                        if (entityManager.isOpen()) {
-                            entityManager.close();
-                        }
-                    } catch (Exception e) {
-                        // ignore
                     }
+                } catch (Exception e) {
+                    // ignore
                 }
             }
         });
@@ -142,29 +143,27 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
 
         // Run this in single transaction.
         final Boolean[] rc = new Boolean[1];
-        transactionStrategy.executeInTransaction(new Runnable() {
-            public void run() {
-                if (isJoinTransaction()) {
-                    entityManager.joinTransaction();
+        transactionStrategy.executeInTransaction(() -> {
+            if (isJoinTransaction()) {
+                entityManager.joinTransaction();
+            }
+            try {
+                List<?> list = query(entityManager, messageId);
+                if (list.isEmpty()) {
+                    rc[0] = Boolean.FALSE;
+                } else {
+                    rc[0] = Boolean.TRUE;
                 }
+            } catch (Exception ex) {
+                String contextInfo = String.format(SOMETHING_WENT_WRONG, ex.getMessage());
+                throw new PersistenceException(contextInfo, ex);
+            } finally {
                 try {
-                    List<?> list = query(entityManager, messageId);
-                    if (list.isEmpty()) {
-                        rc[0] = Boolean.FALSE;
-                    } else {
-                        rc[0] = Boolean.TRUE;
-                    }
-                } catch (Exception ex) {
-                    LOG.error("Something went wrong trying to check message in repository {}", ex.getMessage(), ex);
-                    throw new PersistenceException(ex);
-                } finally {
-                    try {
-                        if (entityManager.isOpen()) {
-                            entityManager.close();
-                        }
-                    } catch (Exception e) {
-                        // ignore
+                    if (entityManager.isOpen()) {
+                        entityManager.close();
                     }
+                } catch (Exception e) {
+                    // ignore
                 }
             }
         });
@@ -185,33 +184,31 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
                 = getTargetEntityManager(exchange, entityManagerFactory, true, sharedEntityManager, true);
 
         Boolean rc[] = new Boolean[1];
-        transactionStrategy.executeInTransaction(new Runnable() {
-            public void run() {
-                if (isJoinTransaction()) {
-                    entityManager.joinTransaction();
+        transactionStrategy.executeInTransaction(() -> {
+            if (isJoinTransaction()) {
+                entityManager.joinTransaction();
+            }
+            try {
+                List<?> list = query(entityManager, messageId);
+                if (list.isEmpty()) {
+                    rc[0] = Boolean.FALSE;
+                } else {
+                    MessageProcessed processed = (MessageProcessed) list.get(0);
+                    entityManager.remove(processed);
+                    entityManager.flush();
+                    entityManager.close();
+                    rc[0] = Boolean.TRUE;
                 }
+            } catch (Exception ex) {
+                String contextInfo = String.format(SOMETHING_WENT_WRONG, ex.getMessage());
+                throw new PersistenceException(contextInfo, ex);
+            } finally {
                 try {
-                    List<?> list = query(entityManager, messageId);
-                    if (list.isEmpty()) {
-                        rc[0] = Boolean.FALSE;
-                    } else {
-                        MessageProcessed processed = (MessageProcessed) list.get(0);
-                        entityManager.remove(processed);
-                        entityManager.flush();
+                    if (entityManager.isOpen()) {
                         entityManager.close();
-                        rc[0] = Boolean.TRUE;
-                    }
-                } catch (Exception ex) {
-                    LOG.error("Something went wrong trying to remove message to repository {}", ex.getMessage(), ex);
-                    throw new PersistenceException(ex);
-                } finally {
-                    try {
-                        if (entityManager.isOpen()) {
-                            entityManager.close();
-                        }
-                    } catch (Exception e) {
-                        // ignore
                     }
+                } catch (Exception e) {
+                    // ignore
                 }
             }
         });
@@ -236,33 +233,31 @@ public class JpaMessageIdRepository extends ServiceSupport implements Idempotent
     public void clear() {
         final EntityManager entityManager = getTargetEntityManager(null, entityManagerFactory, true, sharedEntityManager, true);
 
-        transactionStrategy.executeInTransaction(new Runnable() {
-            public void run() {
-                if (isJoinTransaction()) {
-                    entityManager.joinTransaction();
+        transactionStrategy.executeInTransaction(() -> {
+            if (isJoinTransaction()) {
+                entityManager.joinTransaction();
+            }
+            try {
+                List<?> list = queryClear(entityManager);
+                if (!list.isEmpty()) {
+                    Iterator<?> it = list.iterator();
+                    while (it.hasNext()) {
+                        Object item = it.next();
+                        entityManager.remove(item);
+                    }
+                    entityManager.flush();
+                    entityManager.close();
                 }
+            } catch (Exception ex) {
+                String contextInfo = String.format(SOMETHING_WENT_WRONG, ex.getMessage());
+                throw new PersistenceException(contextInfo, ex);
+            } finally {
                 try {
-                    List<?> list = queryClear(entityManager);
-                    if (!list.isEmpty()) {
-                        Iterator it = list.iterator();
-                        while (it.hasNext()) {
-                            Object item = it.next();
-                            entityManager.remove(item);
-                        }
-                        entityManager.flush();
+                    if (entityManager.isOpen()) {
                         entityManager.close();
                     }
-                } catch (Exception ex) {
-                    LOG.error("Something went wrong trying to clear the repository {}", ex.getMessage(), ex);
-                    throw new PersistenceException(ex);
-                } finally {
-                    try {
-                        if (entityManager.isOpen()) {
-                            entityManager.close();
-                        }
-                    } catch (Exception e) {
-                        // ignore
-                    }
+                } catch (Exception e) {
+                    // ignore
                 }
             }
         });
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaComponentTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaComponentTest.java
index e75e8c3c879..1a1d5ce550f 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaComponentTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaComponentTest.java
@@ -34,74 +34,80 @@ public class JpaComponentTest extends CamelTestSupport {
 
     @Test
     public void testJpaComponentConsumerHasLockModeType() throws Exception {
-        JpaComponent comp = new JpaComponent();
-        comp.setCamelContext(context);
-        assertNull(comp.getEntityManagerFactory());
-        assertNull(comp.getTransactionManager());
-
-        JpaEndpoint jpa
-                = (JpaEndpoint) comp.createEndpoint("jpa://" + SendEmail.class.getName() + "?lockModeType=PESSIMISTIC_WRITE");
-        JpaConsumer consumer = (JpaConsumer) jpa.createConsumer(null);
-
-        assertEquals(LockModeType.PESSIMISTIC_WRITE, consumer.getLockModeType());
+        try (JpaComponent comp = new JpaComponent()) {
+            comp.setCamelContext(context);
+            assertNull(comp.getEntityManagerFactory());
+            assertNull(comp.getTransactionManager());
+
+            JpaEndpoint jpa
+                    = (JpaEndpoint) comp
+                            .createEndpoint("jpa://" + SendEmail.class.getName() + "?lockModeType=PESSIMISTIC_WRITE");
+            JpaConsumer consumer = (JpaConsumer) jpa.createConsumer(null);
+
+            assertEquals(LockModeType.PESSIMISTIC_WRITE, consumer.getLockModeType());
+        }
     }
 
     @Test
     public void testJpaComponentCtr() throws Exception {
-        JpaComponent comp = new JpaComponent();
-        comp.setCamelContext(context);
-        assertNull(comp.getEntityManagerFactory());
-        assertNull(comp.getTransactionManager());
-
-        JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa://" + SendEmail.class.getName());
-        assertNotNull(jpa);
-        assertNotNull(jpa.getEntityType());
+        try (JpaComponent comp = new JpaComponent()) {
+            comp.setCamelContext(context);
+            assertNull(comp.getEntityManagerFactory());
+            assertNull(comp.getTransactionManager());
+
+            JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa://" + SendEmail.class.getName());
+            assertNotNull(jpa);
+            assertNotNull(jpa.getEntityType());
+        }
     }
 
     @Test
     public void testJpaComponentEMFandTM() throws Exception {
-        JpaComponent comp = new JpaComponent();
-        comp.setCamelContext(context);
-        assertNull(comp.getEntityManagerFactory());
-        assertNull(comp.getTransactionManager());
+        try (JpaComponent comp = new JpaComponent()) {
+            comp.setCamelContext(context);
+            assertNull(comp.getEntityManagerFactory());
+            assertNull(comp.getTransactionManager());
 
-        EntityManagerFactory fac = Persistence.createEntityManagerFactory("camel");
-        JpaTransactionManager tm = new JpaTransactionManager(fac);
-        tm.afterPropertiesSet();
+            EntityManagerFactory fac = Persistence.createEntityManagerFactory("camel");
+            JpaTransactionManager tm = new JpaTransactionManager(fac);
+            tm.afterPropertiesSet();
 
-        comp.setEntityManagerFactory(fac);
-        comp.setTransactionManager(tm);
+            comp.setEntityManagerFactory(fac);
+            comp.setTransactionManager(tm);
 
-        assertSame(fac, comp.getEntityManagerFactory());
-        assertSame(tm, comp.getTransactionManager());
+            assertSame(fac, comp.getEntityManagerFactory());
+            assertSame(tm, comp.getTransactionManager());
 
-        JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa://" + SendEmail.class.getName());
-        assertNotNull(jpa);
-        assertNotNull(jpa.getEntityType());
+            JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa://" + SendEmail.class.getName());
+            assertNotNull(jpa);
+            assertNotNull(jpa.getEntityType());
+        }
     }
 
     @Test
     public void testJpaComponentWithPath() throws Exception {
-        JpaComponent comp = new JpaComponent();
-        comp.setCamelContext(context);
-        assertNull(comp.getEntityManagerFactory());
-        assertNull(comp.getTransactionManager());
-
-        JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint(
-                "jpa://" + SendEmail.class.getName() + "?persistenceUnit=journalPersistenceUnit&usePersist=true");
-        assertNotNull(jpa);
-        assertNotNull(jpa.getEntityType());
+        try (JpaComponent comp = new JpaComponent()) {
+            comp.setCamelContext(context);
+            assertNull(comp.getEntityManagerFactory());
+            assertNull(comp.getTransactionManager());
+
+            JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint(
+                    "jpa://" + SendEmail.class.getName() + "?persistenceUnit=journalPersistenceUnit&usePersist=true");
+            assertNotNull(jpa);
+            assertNotNull(jpa.getEntityType());
+        }
     }
 
     @Test
     public void testJpaComponentEmptyPath() throws Exception {
-        JpaComponent comp = new JpaComponent();
-        comp.setCamelContext(context);
-        assertNull(comp.getEntityManagerFactory());
-        assertNull(comp.getTransactionManager());
-
-        JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa:?persistenceUnit=journalPersistenceUnit&usePersist=true");
-        assertNotNull(jpa);
-        assertNull(jpa.getEntityType());
+        try (JpaComponent comp = new JpaComponent()) {
+            comp.setCamelContext(context);
+            assertNull(comp.getEntityManagerFactory());
+            assertNull(comp.getTransactionManager());
+
+            JpaEndpoint jpa = (JpaEndpoint) comp.createEndpoint("jpa:?persistenceUnit=journalPersistenceUnit&usePersist=true");
+            assertNotNull(jpa);
+            assertNull(jpa.getEntityType());
+        }
     }
 }
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaEndpointTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaEndpointTest.java
index 0b77c258355..297782c7900 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaEndpointTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaEndpointTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jpa;
 
+import java.io.IOException;
+
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
 
@@ -31,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 public class JpaEndpointTest extends CamelTestSupport {
 
     @Test
-    public void testJpaEndpointCtr() {
+    public void testJpaEndpointCtr() throws IOException {
         JpaEndpoint jpa = new JpaEndpoint();
         jpa.setEntityType(SendEmail.class);
 
@@ -40,15 +42,17 @@ public class JpaEndpointTest extends CamelTestSupport {
 
         assertEquals("jpa://org.apache.camel.examples.SendEmail", jpa.getEndpointUri());
         assertEquals("camel", jpa.getPersistenceUnit());
+        jpa.close();
     }
 
     /**
      * 
+     * @throws     IOException
      * @deprecated
      */
     @Deprecated
     @Test
-    public void testJpaEndpointCtrUrl() {
+    public void testJpaEndpointCtrUrl() throws IOException {
         JpaEndpoint jpa = new JpaEndpoint("jpa://org.apache.camel.examples.SendEmail", null);
         jpa.setEntityType(SendEmail.class);
 
@@ -57,15 +61,17 @@ public class JpaEndpointTest extends CamelTestSupport {
 
         assertEquals("jpa://org.apache.camel.examples.SendEmail", jpa.getEndpointUri());
         assertEquals("camel", jpa.getPersistenceUnit());
+        jpa.close();
     }
 
     /**
      * 
+     * @throws     IOException
      * @deprecated
      */
     @Deprecated
     @Test
-    public void testJpaEndpointCtrUrlEMF() {
+    public void testJpaEndpointCtrUrlEMF() throws IOException {
         EntityManagerFactory fac = Persistence.createEntityManagerFactory("camel");
 
         JpaEndpoint jpa = new JpaEndpoint("jpa://org.apache.camel.examples.SendEmail", null);
@@ -77,15 +83,17 @@ public class JpaEndpointTest extends CamelTestSupport {
 
         assertEquals("jpa://org.apache.camel.examples.SendEmail", jpa.getEndpointUri());
         assertEquals("camel", jpa.getPersistenceUnit());
+        jpa.close();
     }
 
     /**
      * 
+     * @throws     IOException
      * @deprecated
      */
     @Deprecated
     @Test
-    public void testJpaEndpointCtrUrlEMFandTM() {
+    public void testJpaEndpointCtrUrlEMFandTM() throws IOException {
         EntityManagerFactory fac = Persistence.createEntityManagerFactory("camel");
         JpaTransactionManager tm = new JpaTransactionManager(fac);
         tm.afterPropertiesSet();
@@ -100,10 +108,11 @@ public class JpaEndpointTest extends CamelTestSupport {
 
         assertEquals("jpa://org.apache.camel.examples.SendEmail", jpa.getEndpointUri());
         assertEquals("camel", jpa.getPersistenceUnit());
+        jpa.close();
     }
 
     @Test
-    public void testJpaEndpointCustomEMFandTM() {
+    public void testJpaEndpointCustomEMFandTM() throws IOException {
         EntityManagerFactory fac = Persistence.createEntityManagerFactory("camel");
         JpaTransactionManager tm = new JpaTransactionManager(fac);
         tm.afterPropertiesSet();
@@ -119,5 +128,6 @@ public class JpaEndpointTest extends CamelTestSupport {
 
         assertEquals("jpa://org.apache.camel.examples.SendEmail", jpa.getEndpointUri());
         assertEquals("camel", jpa.getPersistenceUnit());
+        jpa.close();
     }
 }
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
index b42d184109e..8a681cefbf8 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
@@ -136,7 +136,7 @@ public class JpaTest {
         listEndpoint = camelContext.getEndpoint(getEndpointUri() + "&entityType=java.util.List", JpaEndpoint.class);
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.createEntityManager();
+        entityManager = endpoint.getEntityManagerFactory().createEntityManager();
 
         transactionTemplate.execute(new TransactionCallback<Object>() {
             public Object doInTransaction(TransactionStatus status) {
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
index 907dae4cf8e..2f3b2a80ae7 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryAndParametersTest.java
@@ -143,7 +143,7 @@ public class JpaWithNamedQueryAndParametersTest {
         endpoint = (JpaEndpoint) value;
 
         transactionTemplate = endpoint.createTransactionTemplate();
-        entityManager = endpoint.createEntityManager();
+        entityManager = endpoint.getEntityManagerFactory().createEntityManager();
     }
 
     protected String getEndpointUri() {
diff --git a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
index de2d3a1bb68..2937ceda539 100644
--- a/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
+++ b/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaRouteSharedEntityManagerTest.java
@@ -82,7 +82,7 @@ public class JpaRouteSharedEntityManagerTest extends AbstractJpaTest {
         Expression expression = parser.parseExpression("nativeEntityManagerFactory.brokerFactory.openBrokers");
         List<?> brokers = expression.getValue(context, List.class);
 
-        return brokers.size();
+        return brokers != null ? brokers.size() : 0;
     }
 
     @Test