You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/10 15:02:02 UTC

[camel] branch main updated: CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1eccac6  CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2
1eccac6 is described below

commit 1eccac68bd6546d80459d3f100617a1692087f29
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 10 16:00:38 2022 +0100

    CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2
---
 components/camel-rabbitmq/pom.xml                  | 17 ++++++-------
 .../camel/component/rabbitmq/RabbitMQProducer.java | 28 +++++++++-------------
 .../rabbitmq/pool/PoolableChannelFactory.java      | 25 ++++++++++++-------
 .../component/rabbitmq/RabbitMQProducerTest.java   |  7 +++---
 4 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 4fabdea..a32aa92 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -35,19 +35,20 @@
     </properties>
 
     <dependencies>
+
         <dependency>
-            <groupId>com.rabbitmq</groupId>
-            <artifactId>amqp-client</artifactId>
-            <version>${rabbitmq-amqp-client-version}</version>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
         </dependency>
         <dependency>
-            <groupId>commons-pool</groupId>
-            <artifactId>commons-pool</artifactId>
-            <version>${commons-pool-version}</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>${commons-pool2-version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-support</artifactId>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>${rabbitmq-amqp-client-version}</version>
         </dependency>
 
         <!-- test infra -->
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 07e0419..8692ded 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -35,8 +36,9 @@ import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.commons.pool.ObjectPool;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,8 +102,6 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
 
     /**
      * Open connection and initialize channel pool
-     * 
-     * @throws Exception
      */
     private synchronized void openConnectionAndChannelPool() throws Exception {
         LOG.trace("Creating connection...");
@@ -110,19 +110,13 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
 
         LOG.trace("Creating channel pool...");
         int channelPoolMaxSize = getEndpoint().getChannelPoolMaxSize();
-        channelPool = new GenericObjectPool<>(
-                new PoolableChannelFactory(this.conn),
-                channelPoolMaxSize,
-                GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
-                getEndpoint().getChannelPoolMaxWait(),
-                channelPoolMaxSize,
-                GenericObjectPool.DEFAULT_MIN_IDLE,
-                GenericObjectPool.DEFAULT_TEST_ON_BORROW,
-                GenericObjectPool.DEFAULT_TEST_ON_RETURN,
-                GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS,
-                GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN,
-                GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS,
-                GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
+        long maxWait = getEndpoint().getChannelPoolMaxWait();
+
+        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
+        config.setMaxWait(Duration.ofMillis(maxWait));
+        config.setMaxTotal(channelPoolMaxSize);
+
+        channelPool = new GenericObjectPool(new PoolableChannelFactory(this.conn), config);
         attemptDeclaration();
     }
 
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index 2f5e19a..7076d77 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -18,12 +18,14 @@ package org.apache.camel.component.rabbitmq.pool;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
-import org.apache.commons.pool.PoolableObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.PooledObjectFactory;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 
 /**
  * Channel lifecycle manager: create, check and close channel
  */
-public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+public class PoolableChannelFactory implements PooledObjectFactory<Channel> {
 
     /**
      * Parent connection
@@ -35,12 +37,13 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
     }
 
     @Override
-    public Channel makeObject() throws Exception {
-        return connection.createChannel();
+    public void activateObject(PooledObject<Channel> p) throws Exception {
+        // noop
     }
 
     @Override
-    public void destroyObject(Channel t) throws Exception {
+    public void destroyObject(PooledObject<Channel> p) throws Exception {
+        Channel t = p.getObject();
         try {
             t.close();
         } catch (Exception e) {
@@ -49,16 +52,20 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
     }
 
     @Override
-    public boolean validateObject(Channel t) {
-        return t.isOpen();
+    public void passivateObject(PooledObject<Channel> p) throws Exception {
+        // noop
     }
 
     @Override
-    public void activateObject(Channel t) throws Exception {
+    public boolean validateObject(PooledObject<Channel> p) {
+        Channel t = p.getObject();
+        return t.isOpen();
     }
 
     @Override
-    public void passivateObject(Channel t) throws Exception {
+    public PooledObject<Channel> makeObject() throws Exception {
+        Channel t = connection.createChannel();
+        return new DefaultPooledObject<>(t);
     }
 
 }
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 0751bd1..5775a14 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -33,7 +33,7 @@ import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.util.ReflectionHelper;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -216,9 +216,8 @@ public class RabbitMQProducerTest {
         assertNotNull(channelPool);
         assertTrue(channelPool instanceof GenericObjectPool);
         GenericObjectPool<Channel> genericObjectPool = (GenericObjectPool<Channel>) channelPool;
-        assertEquals(123, genericObjectPool.getMaxActive());
-        assertEquals(123, genericObjectPool.getMaxIdle());
-        assertEquals(321L, genericObjectPool.getMaxWait());
+        assertEquals(123, genericObjectPool.getMaxTotal());
+        assertEquals(321L, genericObjectPool.getMaxWaitDuration().toMillis());
     }
 
     private static class Something {