You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/10 15:02:02 UTC
[camel] branch main updated: CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1eccac6 CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2
1eccac6 is described below
commit 1eccac68bd6546d80459d3f100617a1692087f29
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 10 16:00:38 2022 +0100
CAMEL-17461: camel-rabbitmq - Migrate from commons-pool v1 to v2
---
components/camel-rabbitmq/pom.xml | 17 ++++++-------
.../camel/component/rabbitmq/RabbitMQProducer.java | 28 +++++++++-------------
.../rabbitmq/pool/PoolableChannelFactory.java | 25 ++++++++++++-------
.../component/rabbitmq/RabbitMQProducerTest.java | 7 +++---
4 files changed, 39 insertions(+), 38 deletions(-)
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 4fabdea..a32aa92 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -35,19 +35,20 @@
</properties>
<dependencies>
+
<dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>${rabbitmq-amqp-client-version}</version>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
</dependency>
<dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>${commons-pool-version}</version>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>${commons-pool2-version}</version>
</dependency>
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-support</artifactId>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq-amqp-client-version}</version>
</dependency>
<!-- test infra -->
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 07e0419..8692ded 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -35,8 +36,9 @@ import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
-import org.apache.commons.pool.ObjectPool;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,8 +102,6 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
/**
* Open connection and initialize channel pool
- *
- * @throws Exception
*/
private synchronized void openConnectionAndChannelPool() throws Exception {
LOG.trace("Creating connection...");
@@ -110,19 +110,13 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
LOG.trace("Creating channel pool...");
int channelPoolMaxSize = getEndpoint().getChannelPoolMaxSize();
- channelPool = new GenericObjectPool<>(
- new PoolableChannelFactory(this.conn),
- channelPoolMaxSize,
- GenericObjectPool.WHEN_EXHAUSTED_BLOCK,
- getEndpoint().getChannelPoolMaxWait(),
- channelPoolMaxSize,
- GenericObjectPool.DEFAULT_MIN_IDLE,
- GenericObjectPool.DEFAULT_TEST_ON_BORROW,
- GenericObjectPool.DEFAULT_TEST_ON_RETURN,
- GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS,
- GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN,
- GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS,
- GenericObjectPool.DEFAULT_TEST_WHILE_IDLE);
+ long maxWait = getEndpoint().getChannelPoolMaxWait();
+
+ GenericObjectPoolConfig config = new GenericObjectPoolConfig();
+ config.setMaxWait(Duration.ofMillis(maxWait));
+ config.setMaxTotal(channelPoolMaxSize);
+
+ channelPool = new GenericObjectPool(new PoolableChannelFactory(this.conn), config);
attemptDeclaration();
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index 2f5e19a..7076d77 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -18,12 +18,14 @@ package org.apache.camel.component.rabbitmq.pool;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
-import org.apache.commons.pool.PoolableObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.PooledObjectFactory;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
/**
* Channel lifecycle manager: create, check and close channel
*/
-public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+public class PoolableChannelFactory implements PooledObjectFactory<Channel> {
/**
* Parent connection
@@ -35,12 +37,13 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
}
@Override
- public Channel makeObject() throws Exception {
- return connection.createChannel();
+ public void activateObject(PooledObject<Channel> p) throws Exception {
+ // noop
}
@Override
- public void destroyObject(Channel t) throws Exception {
+ public void destroyObject(PooledObject<Channel> p) throws Exception {
+ Channel t = p.getObject();
try {
t.close();
} catch (Exception e) {
@@ -49,16 +52,20 @@ public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
}
@Override
- public boolean validateObject(Channel t) {
- return t.isOpen();
+ public void passivateObject(PooledObject<Channel> p) throws Exception {
+ // noop
}
@Override
- public void activateObject(Channel t) throws Exception {
+ public boolean validateObject(PooledObject<Channel> p) {
+ Channel t = p.getObject();
+ return t.isOpen();
}
@Override
- public void passivateObject(Channel t) throws Exception {
+ public PooledObject<Channel> makeObject() throws Exception {
+ Channel t = connection.createChannel();
+ return new DefaultPooledObject<>(t);
}
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 0751bd1..5775a14 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -33,7 +33,7 @@ import org.apache.camel.Message;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.util.ReflectionHelper;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -216,9 +216,8 @@ public class RabbitMQProducerTest {
assertNotNull(channelPool);
assertTrue(channelPool instanceof GenericObjectPool);
GenericObjectPool<Channel> genericObjectPool = (GenericObjectPool<Channel>) channelPool;
- assertEquals(123, genericObjectPool.getMaxActive());
- assertEquals(123, genericObjectPool.getMaxIdle());
- assertEquals(321L, genericObjectPool.getMaxWait());
+ assertEquals(123, genericObjectPool.getMaxTotal());
+ assertEquals(321L, genericObjectPool.getMaxWaitDuration().toMillis());
}
private static class Something {