You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/04 07:43:56 UTC

[1/3] camel git commit: CAMEL-7421 Adding Channel pooling in RabbitMQProducer

Repository: camel
Updated Branches:
  refs/heads/master cbdfe050e -> a8dfb0359


CAMEL-7421 Adding Channel pooling in RabbitMQProducer

Fix RabbitMQSpringIntTest

Replace custom object pool by Commons Pool

Fix Spring integration test again


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3ad50186
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3ad50186
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3ad50186

Branch: refs/heads/master
Commit: 3ad501862aa11d16fe882780a5e0456c37b8c1b3
Parents: cbdfe05
Author: Gerald Quintana <ge...@zenika.com>
Authored: Wed May 14 16:35:21 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:29:09 2014 +0100

----------------------------------------------------------------------
 components/camel-rabbitmq/pom.xml               | 214 +++++++--------
 .../component/rabbitmq/RabbitMQProducer.java    | 131 ++++++++--
 .../rabbitmq/pool/PoolableChannelFactory.java   |  59 +++++
 .../rabbitmq/RabbitMQSpringIntTest.java         | 257 ++++++++++---------
 .../rabbitmq/RabbitMQSpringIntTest-context.xml  |   7 +-
 5 files changed, 410 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 4c82a15..675a07a 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -1,107 +1,107 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-    
-    http://www.apache.org/licenses/LICENSE-2.0
-    
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.camel</groupId>
-    <artifactId>components</artifactId>
-    <version>2.15-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>camel-rabbitmq</artifactId>
-  <packaging>bundle</packaging>
-  <name>Camel :: RabbitMQ</name>
-  <description>Camel RabbitMQ Component</description>
-
-  <properties>
-    <camel.osgi.export.pkg>
-      org.apache.camel.component.rabbitmq.*
-    </camel.osgi.export.pkg>
-    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.rabbitmq</groupId>
-      <artifactId>amqp-client</artifactId>
-      <version>${rabbitmq-amqp-client-version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-core</artifactId>
-    </dependency>
-
-    <!-- testing -->
-      <dependency>
-          <groupId>org.apache.camel</groupId>
-          <artifactId>camel-test-spring</artifactId>
-          <scope>test</scope>
-      </dependency>
-      <dependency>
-          <groupId>org.apache.camel</groupId>
-          <artifactId>camel-core-xml</artifactId>
-          <scope>test</scope>
-      </dependency>
-      <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>**/*IntTest*</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-  <profiles>
-    <profile>
-      <id>itest</id>
-      <build>
-        <plugins>
-          <plugin>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>None</exclude>
-              </excludes>
-              <includes>
-                <include>**/*IntTest*</include>
-              </includes>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+    
+    http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.15-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-rabbitmq</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: RabbitMQ</name>
+  <description>Camel RabbitMQ Component</description>
+
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.rabbitmq.*
+    </camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=rabbitmq</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>${rabbitmq-amqp-client-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+      <version>${commons-pool-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/*IntTest*</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>itest</id>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/*.xml</exclude>
+              </excludes>
+              <includes>
+                <include>**/*IntTest*</include>
+              </includes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index f5c7eb4..755fa93 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,13 +16,6 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -30,13 +23,30 @@ import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+
 public class RabbitMQProducer extends DefaultProducer {
 
     private int closeTimeout = 30 * 1000;
     private Connection conn;
-    private Channel channel;
+    /**
+     * Maximum number of opened channel in pool
+     */
+    private int channelPoolMaxSize = 10;
+    /**
+    * Maximum time (in milliseconds) waiting for channel
+    */
+    private long channelPoolMaxWait = 1000;
+    private ObjectPool<Channel> channelPool;
     private ExecutorService executorService;
-   
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
     }
@@ -46,19 +56,41 @@ public class RabbitMQProducer extends DefaultProducer {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
     /**
-     * Open connection and channel
+     * Channel callback (similar to Spring JDBC ConnectionCallback)
+     */
+    private static interface ChannelCallback<T> {
+        public T doWithChannel(Channel channel) throws Exception;
+    }
+    /**
+     * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
+     */
+    private <T> T execute(ChannelCallback<T> callback) throws Exception {
+        Channel channel = channelPool.borrowObject();
+        try {
+            return callback.doWithChannel(channel);
+        } finally {
+            channelPool.returnObject(channel);
+        }
+    }
+    /**
+     * Open connection and initialize channel pool
      */
-    private void openConnectionAndChannel() throws IOException {
+    private void openConnectionAndChannelPool() throws Exception {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executorService);
         log.debug("Created connection: {}", conn);
 
-        log.trace("Creating channel...");
-        this.channel = conn.createChannel();
-        log.debug("Created channel: {}", channel);
+        log.trace("Creating channel pool...");
+        channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
         if (getEndpoint().isDeclare()) {
-            getEndpoint().declareExchangeAndQueue(this.channel);
-        }        
+            execute(new ChannelCallback<Void>() {
+                @Override
+                public Void doWithChannel(Channel channel) throws Exception {
+                    getEndpoint().declareExchangeAndQueue(channel);
+                    return null;
+                }
+            });
+        }
     }
 
     @Override
@@ -66,7 +98,7 @@ public class RabbitMQProducer extends DefaultProducer {
         this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
 
         try {
-            openConnectionAndChannel();
+            openConnectionAndChannelPool();
         } catch (IOException e) {
             log.warn("Failed to create connection", e);
         }
@@ -75,12 +107,8 @@ public class RabbitMQProducer extends DefaultProducer {
     /**
      * If needed, close Connection and Channel
      */
-    private void closeConnectionAndChannel() throws IOException {
-        if (channel != null) {
-            log.debug("Closing channel: {}", channel);
-            channel.close();
-            channel = null;
-        }
+    private void closeConnectionAndChannel() throws Exception {
+        channelPool.close();
         if (conn != null) {
             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
             conn.close(closeTimeout);
@@ -113,13 +141,30 @@ public class RabbitMQProducer extends DefaultProducer {
             throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint());
         }
         byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
-        AMQP.BasicProperties.Builder properties = buildProperties(exchange);
+        AMQP.BasicProperties properties = buildProperties(exchange).build();
 
-        if (channel == null) {
+        basicPublish(exchangeName, key, properties, messageBodyBytes);
+    }
+
+    /**
+     * Send a message borrowing a channel from the pool
+     * @param exchange Target exchange
+     * @param routingKey Routing key
+     * @param properties Header properties
+     * @param body Body content
+     */
+    private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+        if (channelPool==null) {
             // Open connection and channel lazily
-            openConnectionAndChannel();
+            openConnectionAndChannelPool();
         }
-        channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
+        execute(new ChannelCallback<Void>() {
+            @Override
+            public Void doWithChannel(Channel channel) throws Exception {
+                channel.basicPublish(exchange, routingKey, properties, body);
+                return null;
+            }
+        });
     }
 
     AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
@@ -246,4 +291,36 @@ public class RabbitMQProducer extends DefaultProducer {
     public void setCloseTimeout(int closeTimeout) {
         this.closeTimeout = closeTimeout;
     }
+
+    /**
+     * Get maximum number of opened channel in pool
+     * @return Maximum number of opened channel in pool
+     */
+    public int getChannelPoolMaxSize() {
+            return channelPoolMaxSize;
+    }
+
+    /**
+     * Set maximum number of opened channel in pool
+     * @param channelPoolMaxSize Maximum number of opened channel in pool
+     */
+    public void setChannelPoolMaxSize(int channelPoolMaxSize) {
+            this.channelPoolMaxSize = channelPoolMaxSize;
+    }
+
+    /**
+     * Get the maximum number of milliseconds to wait for a channel from the pool
+     * @return Maximum number of milliseconds waiting for a channel
+     */
+    public long getChannelPoolMaxWait() {
+            return channelPoolMaxWait;
+    }
+
+    /**
+     * Set the maximum number of milliseconds to wait for a channel from the pool
+     * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
+     */
+    public void setChannelPoolMaxWait(long channelPoolMaxWait) {
+            this.channelPoolMaxWait = channelPoolMaxWait;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
new file mode 100644
index 0000000..b9bed13
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.rabbitmq.pool;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * Channel lifecyle manager: create, check and close channel
+ */
+public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+    /**
+     * Parent connection
+     */
+    private final Connection connection;
+
+    public PoolableChannelFactory(Connection connection) {
+        this.connection = connection;
+    }
+    
+    @Override
+    public Channel makeObject() throws Exception {
+        return connection.createChannel();
+    }
+
+    @Override
+    public void destroyObject(Channel t) throws Exception {
+        t.close();
+    }
+
+    @Override
+    public boolean validateObject(Channel t) {
+        return t.isOpen();
+    }
+
+    @Override
+    public void activateObject(Channel t) throws Exception {
+    }
+
+    @Override
+    public void passivateObject(Channel t) throws Exception {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index 6119082..f65b909 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -1,122 +1,135 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.rabbitmq;
-
-import java.io.IOException;
-
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
-
-import org.apache.camel.Produce;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import static org.junit.Assert.assertEquals;
-/**
- * Test RabbitMQ component with Spring DSL
- */
-@RunWith(CamelSpringJUnit4ClassRunner.class)
-@ContextConfiguration("RabbitMQSpringIntTest-context.xml")
-public class RabbitMQSpringIntTest {
-    @Produce(uri = "direct:rabbitMQ")
-    protected ProducerTemplate template;
-    @Autowired
-    private ConnectionFactory connectionFactory;
-    private Connection connection;
-    private Channel channel;
-
-    private Connection openConnection() throws IOException {
-        if (connection == null) {
-            connection = connectionFactory.newConnection();
-        }
-        return connection;
-    }
-
-    private Channel openChannel() throws IOException {
-        if (channel == null) {
-            channel = openConnection().createChannel();
-        }
-        return channel;
-    }
-
-    @Before
-    public void bindQueueExchange() throws IOException {
-        openChannel();
-        channel.exchangeDeclare("ex2", "direct", true, false, null);
-        channel.queueDeclare("q2", true, false, false, null);
-        channel.queueBind("q2", "ex2", "rk2");
-    }
-
-    @After
-    public void closeConnection() {
-        if (channel != null) {
-            try {
-                channel.close();
-            } catch (IOException e) {
-            }
-        }
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (IOException e) {
-            }
-        }
-    }
-
-    private static final class LastDeliveryConsumer extends DefaultConsumer {
-        private byte[] lastBody;
-
-        private LastDeliveryConsumer(Channel channel) {
-            super(channel);
-        }
-
-        @Override
-        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-            lastBody = body;
-            super.handleDelivery(consumerTag, envelope, properties, body);
-        }
-
-        public byte[] getLastBody() {
-            return lastBody;
-        }
-    }
-
-    @Test
-    public void testSendCsutomConnectionFactory() throws Exception {
-        String body = "Hello Rabbit";
-        template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
-
-        openChannel();
-        LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
-        channel.basicConsume("q2", true, consumer);
-        int i = 10;
-        while (consumer.getLastBody() == null && i > 0) {
-            Thread.sleep(1000L);
-            i--;
-        }
-        assertEquals(body, new String(consumer.getLastBody()));
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Test RabbitMQ component with Spring DSL
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class RabbitMQSpringIntTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate template;
+    @Autowired
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Channel channel;
+    private boolean isConnectionOpened() {
+        return connection!=null && connection.isOpen();
+    }
+    private Connection openConnection() throws IOException {
+        if (!isConnectionOpened()) {
+            LOGGER.info("Open connection");
+            connection = connectionFactory.newConnection();
+        }
+        return connection;
+    }
+    private boolean isChannelOpened() {
+        return channel != null && channel.isOpen();
+    }
+    private Channel openChannel() throws IOException {
+        if (!isChannelOpened()) {
+            LOGGER.info("Open channel");
+            channel = openConnection().createChannel();
+        }
+        return channel;
+    }
+
+    @Before
+    public void bindQueueExchange() throws IOException {
+        openChannel();
+        /*
+        LOGGER.info("Declare exchange queue");
+        channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>());
+        channel.queueDeclare("q2", true, false, false, null);
+        channel.queueBind("q2", "ex2", "rk2");
+        */
+    }
+
+    @After
+    public void closeConnection() {
+        if (isChannelOpened()) {
+            try {
+                LOGGER.info("Close channel");
+                channel.close();
+            } catch (IOException e) {
+            }
+        }
+        if (isConnectionOpened()) {
+            try {
+                LOGGER.info("Close connection");
+                connection.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    private static final class LastDeliveryConsumer extends DefaultConsumer {
+        private byte[] lastBody;
+
+        private LastDeliveryConsumer(Channel channel) {
+            super(channel);
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+            lastBody = body;
+            super.handleDelivery(consumerTag, envelope, properties, body);
+        }
+
+        public byte[] getLastBody() {
+            return lastBody;
+        }
+        public String getLastBodyAsString() {
+            return lastBody == null ? null : new String(lastBody);
+        }
+    }
+
+    @Test
+    public void testSendCsutomConnectionFactory() throws Exception {
+        String body = "Hello Rabbit";
+        template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+        openChannel();
+        LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+        channel.basicConsume("q2", true, consumer);
+        int i = 10;
+        while (consumer.getLastBody() == null && i > 0) {
+            Thread.sleep(1000L);
+            i--;
+        }
+        assertEquals(body, consumer.getLastBodyAsString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/3ad50186/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
index 6810583..b4688c7 100644
--- a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
+++ b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
@@ -21,7 +21,10 @@
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
-
+    <!-- To create and grant user cameltest:
+        rabbitmqctl add_user cameltest cameltest
+        rabbitmqctl set_permissions -p / cameltest ".*" ".*" ".*"
+    -->
     <!-- START SNIPPET: custom connection factory -->
     <bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
         <property name="host" value="localhost"/>
@@ -33,7 +36,7 @@
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:rabbitMQ"/>
-            <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&amp;queue=q2"/>
+            <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&amp;queue=q2&amp;routingKey=rk2"/>
         </route>
     </camelContext>
     <!-- END SNIPPET: example -->


[2/3] camel git commit: CAMEL-7421: Fixed CS

Posted by da...@apache.org.
CAMEL-7421: Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/00e93fc3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/00e93fc3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/00e93fc3

Branch: refs/heads/master
Commit: 00e93fc3b5664b3b830a163819c285a0150532f5
Parents: 3ad5018
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 4 07:34:40 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:34:40 2014 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQProducer.java    | 53 ++++++++++++--------
 .../rabbitmq/pool/PoolableChannelFactory.java   | 17 ++++---
 .../rabbitmq/RabbitMQSpringIntTest.java         | 33 ++++++------
 3 files changed, 59 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 755fa93..54562d5 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -16,20 +16,20 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ObjectHelper;
-
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.camel.Exchange;
 import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 
@@ -42,11 +42,12 @@ public class RabbitMQProducer extends DefaultProducer {
      */
     private int channelPoolMaxSize = 10;
     /**
-    * Maximum time (in milliseconds) waiting for channel
-    */
+     * Maximum time (in milliseconds) waiting for channel
+     */
     private long channelPoolMaxWait = 1000;
     private ObjectPool<Channel> channelPool;
     private ExecutorService executorService;
+
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
     }
@@ -55,12 +56,14 @@ public class RabbitMQProducer extends DefaultProducer {
     public RabbitMQEndpoint getEndpoint() {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
+
     /**
      * Channel callback (similar to Spring JDBC ConnectionCallback)
      */
     private static interface ChannelCallback<T> {
-        public T doWithChannel(Channel channel) throws Exception;
+        T doWithChannel(Channel channel) throws Exception;
     }
+
     /**
      * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
      */
@@ -72,6 +75,7 @@ public class RabbitMQProducer extends DefaultProducer {
             channelPool.returnObject(channel);
         }
     }
+
     /**
      * Open connection and initialize channel pool
      */
@@ -81,7 +85,7 @@ public class RabbitMQProducer extends DefaultProducer {
         log.debug("Created connection: {}", conn);
 
         log.trace("Creating channel pool...");
-        channelPool = new GenericObjectPool<>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
+        channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
         if (getEndpoint().isDeclare()) {
             execute(new ChannelCallback<Void>() {
                 @Override
@@ -147,14 +151,15 @@ public class RabbitMQProducer extends DefaultProducer {
     }
 
     /**
-     * Send a message borrowing a channel from the pool
-     * @param exchange Target exchange
+     * Send a message borrowing a channel from the pool.
+     *
+     * @param exchange   Target exchange
      * @param routingKey Routing key
      * @param properties Header properties
-     * @param body Body content
+     * @param body       Body content
      */
     private void basicPublish(final String exchange, final String routingKey, final AMQP.BasicProperties properties, final byte[] body) throws Exception {
-        if (channelPool==null) {
+        if (channelPool == null) {
             // Open connection and channel lazily
             openConnectionAndChannelPool();
         }
@@ -174,7 +179,7 @@ public class RabbitMQProducer extends DefaultProducer {
         if (contentType != null) {
             properties.contentType(contentType.toString());
         }
-        
+
         final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY);
         if (priority != null) {
             properties.priority(Integer.parseInt(priority.toString()));
@@ -263,8 +268,8 @@ public class RabbitMQProducer extends DefaultProducer {
     /**
      * Strategy to test if the given header is valid
      *
-     * @param headerValue  the header value
-     * @return  the value to use, <tt>null</tt> to ignore this header
+     * @param headerValue the header value
+     * @return the value to use, <tt>null</tt> to ignore this header
      * @see com.rabbitmq.client.impl.Frame#fieldValueSize
      */
     private Object getValidRabbitMQHeaderValue(Object headerValue) {
@@ -294,33 +299,37 @@ public class RabbitMQProducer extends DefaultProducer {
 
     /**
      * Get maximum number of opened channel in pool
+     *
      * @return Maximum number of opened channel in pool
      */
     public int getChannelPoolMaxSize() {
-            return channelPoolMaxSize;
+        return channelPoolMaxSize;
     }
 
     /**
      * Set maximum number of opened channel in pool
+     *
      * @param channelPoolMaxSize Maximum number of opened channel in pool
      */
     public void setChannelPoolMaxSize(int channelPoolMaxSize) {
-            this.channelPoolMaxSize = channelPoolMaxSize;
+        this.channelPoolMaxSize = channelPoolMaxSize;
     }
 
     /**
      * Get the maximum number of milliseconds to wait for a channel from the pool
+     *
      * @return Maximum number of milliseconds waiting for a channel
      */
     public long getChannelPoolMaxWait() {
-            return channelPoolMaxWait;
+        return channelPoolMaxWait;
     }
 
     /**
      * Set the maximum number of milliseconds to wait for a channel from the pool
+     *
      * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
      */
     public void setChannelPoolMaxWait(long channelPoolMaxWait) {
-            this.channelPoolMaxWait = channelPoolMaxWait;
+        this.channelPoolMaxWait = channelPoolMaxWait;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
index b9bed13..b10201f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/pool/PoolableChannelFactory.java
@@ -1,9 +1,10 @@
-/*
- * Copyright 2014 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -13,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.rabbitmq.pool;
 
 import com.rabbitmq.client.Channel;
@@ -21,9 +21,10 @@ import com.rabbitmq.client.Connection;
 import org.apache.commons.pool.PoolableObjectFactory;
 
 /**
- * Channel lifecyle manager: create, check and close channel
+ * Channel lifecycle manager: create, check and close channel
  */
 public class PoolableChannelFactory implements PoolableObjectFactory<Channel> {
+
     /**
      * Parent connection
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/00e93fc3/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index f65b909..6a3a3a5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -16,39 +16,48 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import com.rabbitmq.client.*;
+import java.io.IOException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
-import java.io.IOException;
-import java.util.HashMap;
-
 import static org.junit.Assert.assertEquals;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 /**
  * Test RabbitMQ component with Spring DSL
  */
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration
 public class RabbitMQSpringIntTest {
+
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSpringIntTest.class);
+
     @Produce(uri = "direct:rabbitMQ")
     protected ProducerTemplate template;
     @Autowired
     private ConnectionFactory connectionFactory;
     private Connection connection;
     private Channel channel;
+
     private boolean isConnectionOpened() {
-        return connection!=null && connection.isOpen();
+        return connection != null && connection.isOpen();
     }
+
     private Connection openConnection() throws IOException {
         if (!isConnectionOpened()) {
             LOGGER.info("Open connection");
@@ -56,9 +65,11 @@ public class RabbitMQSpringIntTest {
         }
         return connection;
     }
+
     private boolean isChannelOpened() {
         return channel != null && channel.isOpen();
     }
+
     private Channel openChannel() throws IOException {
         if (!isChannelOpened()) {
             LOGGER.info("Open channel");
@@ -70,12 +81,6 @@ public class RabbitMQSpringIntTest {
     @Before
     public void bindQueueExchange() throws IOException {
         openChannel();
-        /*
-        LOGGER.info("Declare exchange queue");
-        channel.exchangeDeclare("ex2", "direct", true, false, new HashMap<String, Object>());
-        channel.queueDeclare("q2", true, false, false, null);
-        channel.queueBind("q2", "ex2", "rk2");
-        */
     }
 
     @After
@@ -118,7 +123,7 @@ public class RabbitMQSpringIntTest {
     }
 
     @Test
-    public void testSendCsutomConnectionFactory() throws Exception {
+    public void testSendCustomConnectionFactory() throws Exception {
         String body = "Hello Rabbit";
         template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
 


[3/3] camel git commit: CAMEL-7421: Allow to configure the options from the endpoint which is standard way in Camel.

Posted by da...@apache.org.
CAMEL-7421: Allow to configure the options from the endpoint which is standard way in Camel.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a8dfb035
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a8dfb035
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a8dfb035

Branch: refs/heads/master
Commit: a8dfb03590029365418341a249627e4049acae9c
Parents: 00e93fc
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 4 07:43:43 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 4 07:43:43 2014 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQEndpoint.java    | 40 ++++++++++++++++
 .../component/rabbitmq/RabbitMQProducer.java    | 48 ++------------------
 .../features/src/main/resources/features.xml    |  1 +
 3 files changed, 44 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 645a572..f36fd82 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -95,6 +95,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private String deadLetterQueue;
     //Dead letter exchange type.
     private String deadLetterExchangeType = "direct";
+    //Maximum number of opened channel in pool
+    private int channelPoolMaxSize = 10;
+    //Maximum time (in milliseconds) waiting for channel
+    private long channelPoolMaxWait = 1000;
 
     public RabbitMQEndpoint() {
     }
@@ -539,4 +543,40 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public void setDeadLetterExchangeType(String deadLetterExchangeType) {
         this.deadLetterExchangeType = deadLetterExchangeType;
     }
+
+    /**
+     * Get maximum number of opened channel in pool
+     *
+     * @return Maximum number of opened channel in pool
+     */
+    public int getChannelPoolMaxSize() {
+        return channelPoolMaxSize;
+    }
+
+    /**
+     * Set maximum number of opened channel in pool
+     *
+     * @param channelPoolMaxSize Maximum number of opened channel in pool
+     */
+    public void setChannelPoolMaxSize(int channelPoolMaxSize) {
+        this.channelPoolMaxSize = channelPoolMaxSize;
+    }
+
+    /**
+     * Get the maximum number of milliseconds to wait for a channel from the pool
+     *
+     * @return Maximum number of milliseconds waiting for a channel
+     */
+    public long getChannelPoolMaxWait() {
+        return channelPoolMaxWait;
+    }
+
+    /**
+     * Set the maximum number of milliseconds to wait for a channel from the pool
+     *
+     * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
+     */
+    public void setChannelPoolMaxWait(long channelPoolMaxWait) {
+        this.channelPoolMaxWait = channelPoolMaxWait;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 54562d5..96bd516 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -35,18 +35,10 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 
 public class RabbitMQProducer extends DefaultProducer {
 
-    private int closeTimeout = 30 * 1000;
     private Connection conn;
-    /**
-     * Maximum number of opened channel in pool
-     */
-    private int channelPoolMaxSize = 10;
-    /**
-     * Maximum time (in milliseconds) waiting for channel
-     */
-    private long channelPoolMaxWait = 1000;
     private ObjectPool<Channel> channelPool;
     private ExecutorService executorService;
+    private int closeTimeout = 30 * 1000;
 
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
@@ -85,7 +77,8 @@ public class RabbitMQProducer extends DefaultProducer {
         log.debug("Created connection: {}", conn);
 
         log.trace("Creating channel pool...");
-        channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getChannelPoolMaxSize(), GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getChannelPoolMaxWait());
+        channelPool = new GenericObjectPool<Channel>(new PoolableChannelFactory(this.conn), getEndpoint().getChannelPoolMaxSize(),
+                GenericObjectPool.WHEN_EXHAUSTED_BLOCK, getEndpoint().getChannelPoolMaxWait());
         if (getEndpoint().isDeclare()) {
             execute(new ChannelCallback<Void>() {
                 @Override
@@ -297,39 +290,4 @@ public class RabbitMQProducer extends DefaultProducer {
         this.closeTimeout = closeTimeout;
     }
 
-    /**
-     * Get maximum number of opened channel in pool
-     *
-     * @return Maximum number of opened channel in pool
-     */
-    public int getChannelPoolMaxSize() {
-        return channelPoolMaxSize;
-    }
-
-    /**
-     * Set maximum number of opened channel in pool
-     *
-     * @param channelPoolMaxSize Maximum number of opened channel in pool
-     */
-    public void setChannelPoolMaxSize(int channelPoolMaxSize) {
-        this.channelPoolMaxSize = channelPoolMaxSize;
-    }
-
-    /**
-     * Get the maximum number of milliseconds to wait for a channel from the pool
-     *
-     * @return Maximum number of milliseconds waiting for a channel
-     */
-    public long getChannelPoolMaxWait() {
-        return channelPoolMaxWait;
-    }
-
-    /**
-     * Set the maximum number of milliseconds to wait for a channel from the pool
-     *
-     * @param channelPoolMaxWait Maximum number of milliseconds waiting for a channel
-     */
-    public void setChannelPoolMaxWait(long channelPoolMaxWait) {
-        this.channelPoolMaxWait = channelPoolMaxWait;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a8dfb035/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index d335618..51505f8 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1069,6 +1069,7 @@
   <feature name='camel-rabbitmq' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
     <bundle dependency='true'>mvn:com.rabbitmq/amqp-client/${rabbitmq-amqp-client-version}</bundle>
+    <bundle dependency='true'>mvn:commons-pool/commons-pool/${commons-pool-version}</bundle>    
     <bundle>mvn:org.apache.camel/camel-rabbitmq/${project.version}</bundle>
   </feature>
   <feature name='camel-restlet' version='${project.version}' resolver='(obr)' start-level='50'>