You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/04 07:43:56 UTC
[1/3] camel git commit: CAMEL-7421 Adding Channel pooling in
RabbitMQProducer
Repository: camel
Updated Branches:
refs/heads/master cbdfe050e -> a8dfb0359
CAMEL-7421 Adding Channel pooling in RabbitMQProducer
Fix RabbitMQSpringIntTest
Replace custom object pool by Commons Pool
Fix Spring integration test again
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ad50186
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ad50186
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ad50186
Branch: refs/heads/master
Commit: 3ad501862aa11d16fe882780a5e0456c37b8c1b3
Parents: cbdfe05
Author: Gerald Quintana <ge...@zenika.com>
Authored: Wed May 14 16:35:21 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:29:09 2014 +0100
----------------------------------------------------------------------
components/camel-rabbitmq/pom.xml | 214 +++++++--------
.../component/rabbitmq/RabbitMQProducer.java | 131 ++++++++--
.../rabbitmq/pool/PoolableChannelFactory.java | 59 +++++
.../rabbitmq/RabbitMQSpringIntTest.java | 257 ++++++++++---------
.../rabbitmq/RabbitMQSpringIntTest-context.xml | 7 +-
5 files changed, 410 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 4c82a15..675a07a 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -1,107 +1,107 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.camel</groupId>
- <artifactId>components</artifactId>
- <version>2.15-SNAPSHOT</version>
- </parent>
-
- <artifactId>camel-rabbitmq</artifactId>
- <packaging>bundle</packaging>
- <name>Camel :: RabbitMQ</name>
- <description>Camel RabbitMQ Component</description>
-
- <properties>
- <camel.osgi.export.pkg>
- org.apache.camel.component.rabbitmq.*
- </camel.osgi.export.pkg>
- <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>${rabbitmq-amqp-client-version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core</artifactId>
- </dependency>
-
- <!-- testing -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test-spring</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core-xml</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/*IntTest*</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>itest</id>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>None</exclude>
- </excludes>
- <includes>
- <include>**/*IntTest*</include>
- </includes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.15-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-rabbitmq</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: RabbitMQ</name>
+ <description>Camel RabbitMQ Component</description>
+
+ <properties>
+ <camel.osgi.export.pkg>
+ org.apache.camel.component.rabbitmq.*
+ </camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq-amqp-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>${commons-pool-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*IntTest*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>itest</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/*.xml</exclude>
+ </excludes>
+ <includes>
+ <include>**/*IntTest*</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index f5c7eb4..755fa93 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,13 +16,6 @@
*/
package org.apache.camel.component.rabbitmq;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -30,13 +23,30 @@ import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+
public class RabbitMQProducer extends DefaultProducer {
private int closeTimeout = 30 * 1000;
private Connection conn;
- private Channel channel;
+ /**
+ * Maximum number of opened channel in pool
+ */
+ private int channelPoolMaxSize = 10;
+ /**
+ * Maximum time (in milliseconds) waiting for channel
+ */
+ private long channelPoolMaxWait = 1000;
+ private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
-
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
}
@@ -46,19 +56,41 @@ public class RabbitMQProducer extends DefaultProducer {
return (RabbitMQEndpoint) super.getEndpoint();
}
/**
- * Open connection and channel
+ * Channel callback (similar to Spring JDBC ConnectionCallback)
+ */
+ private static interface ChannelCallback<T> {
+ public T doWithChannel(Channel channel) throws Exception;
+ }
+ /**
+ * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
+ */
+ private <T> T execute(ChannelCallback<T> callback) throws Exception {
+ Channel channel = channelPool.borrowObject();
+ try {
+ return callback.doWithChannel(channel);
+ } finally {
+ channelPool.returnObject(channel);
+ }
+ }
+ /**
+ * Open connection and initialize channel pool
*/
- private void openConnectionAndChannel() throws IOException {
+ private void openConnectionAndChannelPool() throws Exception {
log.trace("Creating connection...");
this.conn = getEndpoint().connect(executorService);
log.debug("Created connection: {}", conn);
- log.trace("Creating channel...");
- this.channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
+ log.trace("Creating channel pool...");
+ channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
if (getEndpoint().isDeclare()) {
- getEndpoint().declareExchangeAndQueue(this.channel);
- }
+ execute(new ChannelCallback<Void>() {
+ @Override
+ public Void doWithChannel(Channel channel) throws Exception {
+ getEndpoint().declareExchangeAndQueue(channel);
+ return null;
+ }
+ });
+ }
}
@Override
@@ -66,7 +98,7 @@ public class RabbitMQProducer extends DefaultProducer {
this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
try {
- openConnectionAndChannel();
+ openConnectionAndChannelPool();
} catch (IOException e) {
log.warn("Failed to create connection", e);
}
@@ -75,12 +107,8 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* If needed, close Connection and Channel
*/
- private void closeConnectionAndChannel() throws IOException {
- if (channel != null) {
- log.debug("Closing channel: {}", channel);
- channel.close();
- channel = null;
- }
+ private void closeConnectionAndChannel() throws Exception {
+ channelPool.close();
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
conn.close(closeTimeout);
@@ -113,13 +141,30 @@ public class RabbitMQProducer extends DefaultProducer {
throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
}
byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
- AMQP.BasicProperties.Builder properties = buildProperties(exchange);
+ AMQP.BasicProperties properties = buildProperties(exchange).build();
- if (channel == null) {
+ basicPublish(exchangeName, key, properties, messageBodyBytes);
+ }
+
+ /**
+ * Send a message borrowing a channel from the pool
+ * @param exchange Target exchange
+ * @param routingKey Routing key
+ * @param properties Header properties
+ * @param body Body content
+ */
+ private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+ if (channelPool==null) {
// Open connection and channel lazily
- openConnectionAndChannel();
+ openConnectionAndChannelPool();
}
- channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
+ execute(new ChannelCallback<Void>() {
+ @Override
+ public Void doWithChannel(Channel channel) throws Exception {
+ channel.basicPublish(exchange, routingKey, properties, body);
+ return null;
+ }
+ });
}
AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
@@ -246,4 +291,36 @@ public class RabbitMQProducer extends DefaultProducer {
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
+
+ /**
+ * Get maximum number of opened channel in pool
+ * @return Maximum number of opened channel in pool
+ */
+ public int getChannelPoolMaxSize() {
+ return channelPoolMaxSize;
+ }
+
+ /**
+ * Set maximum number of opened channel in pool
+ * @param channelPoolMaxSize Maximum number of opened channel in pool
+ */
+ public void setChannelPoolMaxSize(int channelPoolMaxSize) {
+ this.channelPoolMaxSize = channelPoolMaxSize;
+ }
+
+ /**
+ * Get the maximum number of milliseconds to wait for a channel from the pool
+ * @return Maximum number of milliseconds waiting for a channel
+ */
+ public long getChannelPoolMaxWait() {
+ return channelPoolMaxWait;
+ }
+
+ /**
+ * Set the maximum number of milliseconds to wait for a channel from the pool
+ * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
+ */
+ public void setChannelPoolMaxWait(long channelPoolMaxWait) {
+ this.channelPoolMaxWait = channelPoolMaxWait;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
new file mode 100644
index 0000000..b9bed13
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rabbitmq.pool;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * Channel lifecyle manager: create, check and close channel
+ */
+public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+ /**
+ * Parent connection
+ */
+ private final Connection connection;
+
+ public PoolableChannelFactory(Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public Channel makeObject() throws Exception {
+ return connection.createChannel();
+ }
+
+ @Override
+ public void destroyObject(Channel t) throws Exception {
+ t.close();
+ }
+
+ @Override
+ public boolean validateObject(Channel t) {
+ return t.isOpen();
+ }
+
+ @Override
+ public void activateObject(Channel t) throws Exception {
+ }
+
+ @Override
+ public void passivateObject(Channel t) throws Exception {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index 6119082..f65b909 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -1,122 +1,135 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.rabbitmq;
-
-import java.io.IOException;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-
-import org.apache.camel.Produce;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import static org.junit.Assert.assertEquals;
-/**
- * Test RabbitMQ component with Spring DSL
- */
-@RunWith(CamelSpringJUnit4ClassRunner.class)
-@ContextConfiguration("RabbitMQSpringIntTest-context.xml")
-public class RabbitMQSpringIntTest {
- @Produce(uri = "direct:rabbitMQ")
- protected ProducerTemplate template;
- @Autowired
- private ConnectionFactory connectionFactory;
- private Connection connection;
- private Channel channel;
-
- private Connection openConnection() throws IOException {
- if (connection == null) {
- connection = connectionFactory.newConnection();
- }
- return connection;
- }
-
- private Channel openChannel() throws IOException {
- if (channel == null) {
- channel = openConnection().createChannel();
- }
- return channel;
- }
-
- @Before
- public void bindQueueExchange() throws IOException {
- openChannel();
- channel.exchangeDeclare("ex2", "direct", true, false, null);
- channel.queueDeclare("q2", true, false, false, null);
- channel.queueBind("q2", "ex2", "rk2");
- }
-
- @After
- public void closeConnection() {
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException e) {
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException e) {
- }
- }
- }
-
- private static final class LastDeliveryConsumer extends DefaultConsumer {
- private byte[] lastBody;
-
- private LastDeliveryConsumer(Channel channel) {
- super(channel);
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- lastBody = body;
- super.handleDelivery(consumerTag, envelope, properties, body);
- }
-
- public byte[] getLastBody() {
- return lastBody;
- }
- }
-
- @Test
- public void testSendCsutomConnectionFactory() throws Exception {
- String body = "Hello Rabbit";
- template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
-
- openChannel();
- LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
- channel.basicConsume("q2", true, consumer);
- int i = 10;
- while (consumer.getLastBody() == null && i > 0) {
- Thread.sleep(1000L);
- i--;
- }
- assertEquals(body, new String(consumer.getLastBody()));
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Test RabbitMQ component with Spring DSL
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class RabbitMQSpringIntTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate template;
+ @Autowired
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+ private boolean isConnectionOpened() {
+ return connection!=null && connection.isOpen();
+ }
+ private Connection openConnection() throws IOException {
+ if (!isConnectionOpened()) {
+ LOGGER.info("Open connection");
+ connection = connectionFactory.newConnection();
+ }
+ return connection;
+ }
+ private boolean isChannelOpened() {
+ return channel != null && channel.isOpen();
+ }
+ private Channel openChannel() throws IOException {
+ if (!isChannelOpened()) {
+ LOGGER.info("Open channel");
+ channel = openConnection().createChannel();
+ }
+ return channel;
+ }
+
+ @Before
+ public void bindQueueExchange() throws IOException {
+ openChannel();
+ /*
+ LOGGER.info("Declare exchange queue");
+ channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>());
+ channel.queueDeclare("q2", true, false, false, null);
+ channel.queueBind("q2", "ex2", "rk2");
+ */
+ }
+
+ @After
+ public void closeConnection() {
+ if (isChannelOpened()) {
+ try {
+ LOGGER.info("Close channel");
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ if (isConnectionOpened()) {
+ try {
+ LOGGER.info("Close connection");
+ connection.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ private static final class LastDeliveryConsumer extends DefaultConsumer {
+ private byte[] lastBody;
+
+ private LastDeliveryConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ lastBody = body;
+ super.handleDelivery(consumerTag, envelope, properties, body);
+ }
+
+ public byte[] getLastBody() {
+ return lastBody;
+ }
+ public String getLastBodyAsString() {
+ return lastBody == null ? null : new String(lastBody);
+ }
+ }
+
+ @Test
+ public void testSendCsutomConnectionFactory() throws Exception {
+ String body = "Hello Rabbit";
+ template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+ openChannel();
+ LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+ channel.basicConsume("q2", true, consumer);
+ int i = 10;
+ while (consumer.getLastBody() == null && i > 0) {
+ Thread.sleep(1000L);
+ i--;
+ }
+ assertEquals(body, consumer.getLastBodyAsString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
index 6810583..b4688c7 100644
--- a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
+++ b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
@@ -21,7 +21,10 @@
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
-
+ <!-- To create and grant user cameltest:
+ rabbitmqctl add_user cameltest cameltest
+ rabbitmqctl set_permissions -p / cameltest ".*" ".*" ".*"
+ -->
<!-- START SNIPPET: custom connection factory -->
<bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="localhost"/>
@@ -33,7 +36,7 @@
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:rabbitMQ"/>
- <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2"/>
+ <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2&routingKey=rk2"/>
</route>
</camelContext>
<!-- END SNIPPET: example -->
[2/3] camel git commit: CAMEL-7421: Fixed CS
Posted by da...@apache.org.
CAMEL-7421: Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/00e93fc3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/00e93fc3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/00e93fc3
Branch: refs/heads/master
Commit: 00e93fc3b5664b3b830a163819c285a0150532f5
Parents: 3ad5018
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 4 07:34:40 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:34:40 2014 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQProducer.java | 53 ++++++++++++--------
.../rabbitmq/pool/PoolableChannelFactory.java | 17 ++++---
.../rabbitmq/RabbitMQSpringIntTest.java | 33 ++++++------
3 files changed, 59 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 755fa93..54562d5 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,20 +16,20 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ObjectHelper;
-
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Exchange;
import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
@@ -42,11 +42,12 @@ public class RabbitMQProducer extends DefaultProducer {
*/
private int channelPoolMaxSize = 10;
/**
- * Maximum time (in milliseconds) waiting for channel
- */
+ * Maximum time (in milliseconds) waiting for channel
+ */
private long channelPoolMaxWait = 1000;
private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
+
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
}
@@ -55,12 +56,14 @@ public class RabbitMQProducer extends DefaultProducer {
public RabbitMQEndpoint getEndpoint() {
return (RabbitMQEndpoint) super.getEndpoint();
}
+
/**
* Channel callback (similar to Spring JDBC ConnectionCallback)
*/
private static interface ChannelCallback<T> {
- public T doWithChannel(Channel channel) throws Exception;
+ T doWithChannel(Channel channel) throws Exception;
}
+
/**
* Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
*/
@@ -72,6 +75,7 @@ public class RabbitMQProducer extends DefaultProducer {
channelPool.returnObject(channel);
}
}
+
/**
* Open connection and initialize channel pool
*/
@@ -81,7 +85,7 @@ public class RabbitMQProducer extends DefaultProducer {
log.debug("Created connection: {}", conn);
log.trace("Creating channel pool...");
- channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
+ channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
if (getEndpoint().isDeclare()) {
execute(new ChannelCallback<Void>() {
@Override
@@ -147,14 +151,15 @@ public class RabbitMQProducer extends DefaultProducer {
}
/**
- * Send a message borrowing a channel from the pool
- * @param exchange Target exchange
+ * Send a message borrowing a channel from the pool.
+ *
+ * @param exchange Target exchange
* @param routingKey Routing key
* @param properties Header properties
- * @param body Body content
+ * @param body Body content
*/
private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception {
- if (channelPool==null) {
+ if (channelPool == null) {
// Open connection and channel lazily
openConnectionAndChannelPool();
}
@@ -174,7 +179,7 @@ public class RabbitMQProducer extends DefaultProducer {
if (contentType != null) {
properties.contentType(contentType.toString());
}
-
+
final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
if (priority != null) {
properties.priority(Integer.parseInt(priority.toString()));
@@ -263,8 +268,8 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* Strategy to test if the given header is valid
*
- * @param headerValue the header value
- * @return the value to use, <tt>null</tt> to ignore this header
+ * @param headerValue the header value
+ * @return the value to use, <tt>null</tt> to ignore this header
* @see com.rabbitmq.client.impl.Frame#fieldValueSize
*/
private Object getValidRabbitMQHeaderValue(Object headerValue) {
@@ -294,33 +299,37 @@ public class RabbitMQProducer extends DefaultProducer {
/**
* Get maximum number of opened channel in pool
+ *
* @return Maximum number of opened channel in pool
*/
public int getChannelPoolMaxSize() {
- return channelPoolMaxSize;
+ return channelPoolMaxSize;
}
/**
* Set maximum number of opened channel in pool
+ *
* @param channelPoolMaxSize Maximum number of opened channel in pool
*/
public void setChannelPoolMaxSize(int channelPoolMaxSize) {
- this.channelPoolMaxSize = channelPoolMaxSize;
+ this.channelPoolMaxSize = channelPoolMaxSize;
}
/**
* Get the maximum number of milliseconds to wait for a channel from the pool
+ *
* @return Maximum number of milliseconds waiting for a channel
*/
public long getChannelPoolMaxWait() {
- return channelPoolMaxWait;
+ return channelPoolMaxWait;
}
/**
* Set the maximum number of milliseconds to wait for a channel from the pool
+ *
* @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
*/
public void setChannelPoolMaxWait(long channelPoolMaxWait) {
- this.channelPoolMaxWait = channelPoolMaxWait;
+ this.channelPoolMaxWait = channelPoolMaxWait;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index b9bed13..b10201f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -1,9 +1,10 @@
-/*
- * Copyright 2014 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -13,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.rabbitmq.pool;
import com.rabbitmq.client.Channel;
@@ -21,9 +21,10 @@ import com.rabbitmq.client.Connection;
import org.apache.commons.pool.PoolableObjectFactory;
/**
- * Channel lifecyle manager: create, check and close channel
+ * Channel lifecycle manager: create, check and close channel
*/
public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+
/**
* Parent connection
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index f65b909..6a3a3a5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -16,39 +16,48 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.*;
+import java.io.IOException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-import java.io.IOException;
-import java.util.HashMap;
-
import static org.junit.Assert.assertEquals;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
/**
* Test RabbitMQ component with Spring DSL
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
public class RabbitMQSpringIntTest {
+
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+
@Produce(uri = "direct:rabbitMQ")
protected ProducerTemplate template;
@Autowired
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
+
private boolean isConnectionOpened() {
- return connection!=null && connection.isOpen();
+ return connection != null && connection.isOpen();
}
+
private Connection openConnection() throws IOException {
if (!isConnectionOpened()) {
LOGGER.info("Open connection");
@@ -56,9 +65,11 @@ public class RabbitMQSpringIntTest {
}
return connection;
}
+
private boolean isChannelOpened() {
return channel != null && channel.isOpen();
}
+
private Channel openChannel() throws IOException {
if (!isChannelOpened()) {
LOGGER.info("Open channel");
@@ -70,12 +81,6 @@ public class RabbitMQSpringIntTest {
@Before
public void bindQueueExchange() throws IOException {
openChannel();
- /*
- LOGGER.info("Declare exchange queue");
- channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>());
- channel.queueDeclare("q2", true, false, false, null);
- channel.queueBind("q2", "ex2", "rk2");
- */
}
@After
@@ -118,7 +123,7 @@ public class RabbitMQSpringIntTest {
}
@Test
- public void testSendCsutomConnectionFactory() throws Exception {
+ public void testSendCustomConnectionFactory() throws Exception {
String body = "Hello Rabbit";
template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
[3/3] camel git commit: CAMEL-7421: Allow to configure the options
from the endpoint which is standard way in Camel.
Posted by da...@apache.org.
CAMEL-7421: Allow to configure the options from the endpoint which is standard way in Camel.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a8dfb035
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a8dfb035
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a8dfb035
Branch: refs/heads/master
Commit: a8dfb03590029365418341a249627e4049acae9c
Parents: 00e93fc
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 4 07:43:43 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:43:43 2014 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQEndpoint.java | 40 ++++++++++++++++
.../component/rabbitmq/RabbitMQProducer.java | 48 ++------------------
.../features/src/main/resources/features.xml | 1 +
3 files changed, 44 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 645a572..f36fd82 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -95,6 +95,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private String deadLetterQueue;
//Dead letter exchange type.
private String deadLetterExchangeType = "direct";
+ //Maximum number of opened channel in pool
+ private int channelPoolMaxSize = 10;
+ //Maximum time (in milliseconds) waiting for channel
+ private long channelPoolMaxWait = 1000;
public RabbitMQEndpoint() {
}
@@ -539,4 +543,40 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public void setDeadLetterExchangeType(String deadLetterExchangeType) {
this.deadLetterExchangeType = deadLetterExchangeType;
}
+
+ /**
+ * Get maximum number of opened channel in pool
+ *
+ * @return Maximum number of opened channel in pool
+ */
+ public int getChannelPoolMaxSize() {
+ return channelPoolMaxSize;
+ }
+
+ /**
+ * Set maximum number of opened channel in pool
+ *
+ * @param channelPoolMaxSize Maximum number of opened channel in pool
+ */
+ public void setChannelPoolMaxSize(int channelPoolMaxSize) {
+ this.channelPoolMaxSize = channelPoolMaxSize;
+ }
+
+ /**
+ * Get the maximum number of milliseconds to wait for a channel from the pool
+ *
+ * @return Maximum number of milliseconds waiting for a channel
+ */
+ public long getChannelPoolMaxWait() {
+ return channelPoolMaxWait;
+ }
+
+ /**
+ * Set the maximum number of milliseconds to wait for a channel from the pool
+ *
+ * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
+ */
+ public void setChannelPoolMaxWait(long channelPoolMaxWait) {
+ this.channelPoolMaxWait = channelPoolMaxWait;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 54562d5..96bd516 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -35,18 +35,10 @@ import org.apache.commons.pool.impl.GenericObjectPool;
public class RabbitMQProducer extends DefaultProducer {
- private int closeTimeout = 30 * 1000;
private Connection conn;
- /**
- * Maximum number of opened channel in pool
- */
- private int channelPoolMaxSize = 10;
- /**
- * Maximum time (in milliseconds) waiting for channel
- */
- private long channelPoolMaxWait = 1000;
private ObjectPool<Channel> channelPool;
private ExecutorService executorService;
+ private int closeTimeout = 30 * 1000;
public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
super(endpoint);
@@ -85,7 +77,8 @@ public class RabbitMQProducer extends DefaultProducer {
log.debug("Created connection: {}", conn);
log.trace("Creating channel pool...");
- channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
+ channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(),
+ GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getEndpoint().getChannelPoolMaxWait());
if (getEndpoint().isDeclare()) {
execute(new ChannelCallback<Void>() {
@Override
@@ -297,39 +290,4 @@ public class RabbitMQProducer extends DefaultProducer {
this.closeTimeout = closeTimeout;
}
- /**
- * Get maximum number of opened channel in pool
- *
- * @return Maximum number of opened channel in pool
- */
- public int getChannelPoolMaxSize() {
- return channelPoolMaxSize;
- }
-
- /**
- * Set maximum number of opened channel in pool
- *
- * @param channelPoolMaxSize Maximum number of opened channel in pool
- */
- public void setChannelPoolMaxSize(int channelPoolMaxSize) {
- this.channelPoolMaxSize = channelPoolMaxSize;
- }
-
- /**
- * Get the maximum number of milliseconds to wait for a channel from the pool
- *
- * @return Maximum number of milliseconds waiting for a channel
- */
- public long getChannelPoolMaxWait() {
- return channelPoolMaxWait;
- }
-
- /**
- * Set the maximum number of milliseconds to wait for a channel from the pool
- *
- * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
- */
- public void setChannelPoolMaxWait(long channelPoolMaxWait) {
- this.channelPoolMaxWait = channelPoolMaxWait;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index d335618..51505f8 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1069,6 +1069,7 @@
<feature name='camel-rabbitmq' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle dependency='true'>mvn:com.rabbitmq/amqp-client/${rabbitmq-amqp-client-version}</bundle>
+ <bundle dependency='true'>mvn:commons-pool/commons-pool/${commons-pool-version}</bundle>
<bundle>mvn:org.apache.camel/camel-rabbitmq/${project.version}</bundle>
</feature>
<feature name='camel-restlet' version='${project.version}' resolver='(obr)' start-level='50'>