You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/07 18:16:47 UTC

[camel] 01/01: CAMEL-19721: camel-activemq - ActiveMQ component should use jakarta based connection pool

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d52683508a6a667244fbe228229dda43f8b38fbf
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Aug 7 20:16:32 2023 +0200

    CAMEL-19721: camel-activemq - ActiveMQ component should use jakarta based connection pool
---
 .../component/activemq/ActiveMQComponent.java      | 12 ++++----
 .../component/activemq/ActiveMQConfiguration.java  | 34 +++++++++++++++-------
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQComponent.java b/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQComponent.java
index 435f07aae0e..c786331a126 100644
--- a/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQComponent.java
+++ b/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQComponent.java
@@ -16,10 +16,10 @@
  */
 package org.apache.camel.component.activemq;
 
+import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.activemq.Service;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.jms.JmsComponent;
@@ -41,7 +41,7 @@ import org.springframework.jms.core.JmsTemplate;
 @Component("activemq")
 public class ActiveMQComponent extends JmsComponent {
     private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList = new CopyOnWriteArrayList<>();
-    private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList = new CopyOnWriteArrayList<>();
+    private final CopyOnWriteArrayList<Object> pooledConnectionFactoryServiceList = new CopyOnWriteArrayList<>();
 
     public ActiveMQComponent() {
     }
@@ -177,7 +177,7 @@ public class ActiveMQComponent extends JmsComponent {
         super.setProperties(bean, parameters);
     }
 
-    protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) {
+    protected void addPooledConnectionFactoryService(Object pooledConnectionFactoryService) {
         pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService);
     }
 
@@ -215,9 +215,11 @@ public class ActiveMQComponent extends JmsComponent {
 
     @Override
     protected void doStop() throws Exception {
-        for (Service s : pooledConnectionFactoryServiceList) {
+        for (Object s : pooledConnectionFactoryServiceList) {
             try {
-                s.stop();
+                // invoke stop method if exists
+                Method m = s.getClass().getMethod("stop");
+                org.apache.camel.support.ObjectHelper.invokeMethod(m, s);
             } catch (Exception e) {
                 // ignore
             }
diff --git a/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQConfiguration.java b/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQConfiguration.java
index c4d9749248f..70cbbff6e6f 100644
--- a/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQConfiguration.java
+++ b/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/ActiveMQConfiguration.java
@@ -21,9 +21,10 @@ import java.lang.reflect.Constructor;
 import jakarta.jms.ConnectionFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.Service;
+import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.support.ObjectHelper;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.connection.DelegatingConnectionFactory;
 import org.springframework.jms.connection.JmsTransactionManager;
@@ -59,8 +60,8 @@ public class ActiveMQConfiguration extends JmsConfiguration {
     }
 
     /**
-     * @deprecated - use JmsConfiguration#getUsername()
      * @see        JmsConfiguration#getUsername()
+     * @deprecated - use JmsConfiguration#getUsername()
      */
     @Deprecated
     public String getUserName() {
@@ -68,8 +69,8 @@ public class ActiveMQConfiguration extends JmsConfiguration {
     }
 
     /**
-     * @deprecated - use JmsConfiguration#setUsername(String)
      * @see        JmsConfiguration#setUsername(String)
+     * @deprecated - use JmsConfiguration#setUsername(String)
      */
     @Deprecated
     public void setUserName(String userName) {
@@ -177,6 +178,7 @@ public class ActiveMQConfiguration extends JmsConfiguration {
             answer.setPassword(getPassword());
         }
         answer.setBrokerURL(getBrokerURL());
+        CamelContext context = activeMQComponent != null ? activeMQComponent.getCamelContext() : null;
         if (isUseSingleConnection()) {
             SingleConnectionFactory scf = new SingleConnectionFactory(answer);
             if (activeMQComponent != null) {
@@ -184,9 +186,9 @@ public class ActiveMQConfiguration extends JmsConfiguration {
             }
             return scf;
         } else if (isUsePooledConnection()) {
-            ConnectionFactory pcf = createPooledConnectionFactory(answer);
+            ConnectionFactory pcf = createPooledConnectionFactory(context, answer);
             if (activeMQComponent != null) {
-                activeMQComponent.addPooledConnectionFactoryService((Service) pcf);
+                activeMQComponent.addPooledConnectionFactoryService(pcf);
             }
             return pcf;
         } else {
@@ -194,17 +196,29 @@ public class ActiveMQConfiguration extends JmsConfiguration {
         }
     }
 
-    protected ConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
+    protected ConnectionFactory createPooledConnectionFactory(
+            CamelContext camelContext, ActiveMQConnectionFactory connectionFactory) {
         try {
-            Class<?> type = loadClass("org.apache.activemq.pool.PooledConnectionFactory", getClass().getClassLoader());
-            Constructor<?> constructor = type.getConstructor(org.apache.activemq.ActiveMQConnectionFactory.class);
-            return (ConnectionFactory) constructor.newInstance(connectionFactory);
+            Class<?> type = loadClass(camelContext, "org.messaginghub.pooled.jms.JmsPoolConnectionFactory",
+                    getClass().getClassLoader());
+
+            Constructor<?> constructor = type.getConstructor();
+            ConnectionFactory cf = (ConnectionFactory) constructor.newInstance();
+            ObjectHelper.invokeMethod(type.getDeclaredMethod("setConnectionFactory", Object.class), cf,
+                    connectionFactory);
+            return cf;
         } catch (Exception e) {
             throw new RuntimeCamelException("Failed to instantiate PooledConnectionFactory: " + e, e);
         }
     }
 
-    public static Class<?> loadClass(String name, ClassLoader loader) throws ClassNotFoundException {
+    public static Class<?> loadClass(CamelContext camelContext, String name, ClassLoader loader) throws ClassNotFoundException {
+        // if camel then use it to load the class
+        if (camelContext != null) {
+            return camelContext.getClassResolver()
+                    .resolveMandatoryClass("org.messaginghub.pooled.jms.JmsPoolConnectionFactory");
+        }
+
         ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
         if (contextClassLoader != null) {
             try {