You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/06 07:02:54 UTC

[1/4] git commit: RabbitMQ - Spring DSL intégration test

Repository: camel
Updated Branches:
  refs/heads/master 69b00a31f -> 686d5fcca


RabbitMQ - Spring DSL intégration test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0762be7e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0762be7e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0762be7e

Branch: refs/heads/master
Commit: 0762be7e581fce84e761442b8eb3abfe23be2af1
Parents: 2b0513d
Author: Gerald Quintana <ge...@zenika.com>
Authored: Mon May 5 15:43:20 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Mon May 5 15:44:03 2014 +0200

----------------------------------------------------------------------
 components/camel-rabbitmq/pom.xml               |  17 ++--
 .../rabbitmq/RabbitMQSpringIntTest.java         | 102 +++++++++++++++++++
 .../rabbitmq/RabbitMQSpringIntTest-context.xml  |  40 ++++++++
 3 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 613025e..4e14471 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -48,12 +48,17 @@
     </dependency>
 
     <!-- testing -->
-    <dependency>
-      <groupId>org.apache.camel</groupId>
-      <artifactId>camel-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
+      <dependency>
+          <groupId>org.apache.camel</groupId>
+          <artifactId>camel-test-spring</artifactId>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.camel</groupId>
+          <artifactId>camel-core-xml</artifactId>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
new file mode 100644
index 0000000..baa4bde
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -0,0 +1,102 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test RabbitMQ component with Spring DSL
+ */
+@RunWith(CamelSpringJUnit4ClassRunner.class)
+@ContextConfiguration("RabbitMQSpringIntTest-context.xml")
+public class RabbitMQSpringIntTest {
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate template;
+    @Autowired
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Channel channel;
+
+    private Connection openConnection() throws IOException {
+        if (connection == null) {
+            connection = connectionFactory.newConnection();
+        }
+        return connection;
+    }
+
+    private Channel openChannel() throws IOException {
+        if (channel == null) {
+            channel = openConnection().createChannel();
+        }
+        return channel;
+    }
+
+    @Before
+    public void bindQueueExchange() throws IOException {
+        openChannel();
+        channel.exchangeDeclare("ex2", "direct", true, false, null);
+        channel.queueDeclare("q2", true, false, false, null);
+        channel.queueBind("q2", "ex2", "rk2");
+    }
+
+    @After
+    public void closeConnection() {
+        if (channel != null) {
+            try {
+                channel.close();
+            } catch (IOException e) {
+            }
+        }
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+
+    private static class LastDeliveryConsumer extends DefaultConsumer {
+        private byte[] lastBody;
+
+        private LastDeliveryConsumer(Channel channel) {
+            super(channel);
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+            lastBody = body;
+            super.handleDelivery(consumerTag, envelope, properties, body);
+        }
+
+        public byte[] getLastBody() {
+            return lastBody;
+        }
+    }
+
+    @Test
+    public void testSendCsutomConnectionFactory() throws Exception {
+        String body = "Hello Rabbit";
+        template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+        openChannel();
+        LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+        channel.basicConsume("q2", true, consumer);
+        int i = 10;
+        while (consumer.getLastBody() == null && i > 0) {
+            Thread.sleep(1000L);
+            i--;
+        }
+        assertEquals(body, new String(consumer.getLastBody()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
new file mode 100644
index 0000000..6810583
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+    <!-- START SNIPPET: custom connection factory -->
+    <bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
+        <property name="host" value="localhost"/>
+        <property name="port" value="5672"/>
+        <property name="username" value="cameltest"/>
+        <property name="password" value="cameltest"/>
+    </bean>
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:rabbitMQ"/>
+            <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&amp;queue=q2"/>
+        </route>
+    </camelContext>
+    <!-- END SNIPPET: example -->
+</beans>
\ No newline at end of file


[2/4] git commit: Reconnect RabbitMQ Producer/Consumer after broker was unavailable

Posted by ni...@apache.org.
Reconnect RabbitMQ Producer/Consumer after broker was unavailable


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1191e19f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1191e19f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1191e19f

Branch: refs/heads/master
Commit: 1191e19fc194b545931530011511a55a8d6d6431
Parents: 0762be7
Author: Gerald Quintana <ge...@zenika.com>
Authored: Mon May 5 15:47:01 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Mon May 5 15:47:01 2014 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 112 +++++++++++++++----
 .../component/rabbitmq/RabbitMQEndpoint.java    |  22 ++++
 .../component/rabbitmq/RabbitMQProducer.java    |  33 +++++-
 .../rabbitmq/RabbitMQEndpointTest.java          |  30 +++--
 .../rabbitmq/RabbitMQReConnectionIntTest.java   |  77 +++++++++++++
 5 files changed, 232 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 501f2a1..0bdc93b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,8 +17,9 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
@@ -36,6 +37,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
     private int closeTimeout = 30 * 1000;
     
     private final RabbitMQEndpoint endpoint;
+    /**
+     * Task in charge of starting consumer
+     */
+    private StartConsumerCallable startConsumerCallable;
 
     public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -43,37 +48,56 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     @Override
-    protected void doStart() throws Exception {
-        executor = endpoint.createExecutor();
-        log.debug("Using executor {}", executor);
-
-        conn = endpoint.connect(executor);
-        log.debug("Using conn {}", conn);
-
-        channel = conn.createChannel();
-        log.debug("Using channel {}", channel);
+    public RabbitMQEndpoint getEndpoint() {
+        return (RabbitMQEndpoint) super.getEndpoint();
+    }
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                endpoint.getExchangeType(),
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
-                new HashMap<String, Object>());
+    /**
+     * Open connection and channel
+     */
+    private void openConnectionAndChannel() throws IOException {
+        log.trace("Creating connection...");
+        this.conn = getEndpoint().connect(executor);
+        log.debug("Created connection: {}", conn);
+
+        log.trace("Creating channel...");
+        this.channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+    }
 
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
-                endpoint.isAutoDelete(), null);
-        channel.queueBind(
-                endpoint.getQueue(),
-                endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : endpoint
-                        .getRoutingKey());
+    /**
+     * If needed, create Exchange and Queue, then add message listener
+     */
+    private void addConsumer() throws IOException {
+        getEndpoint().declareExchangeAndQueue(channel);
 
         channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
                 new RabbitConsumer(this, channel));
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected void doStart() throws Exception {
+        executor = endpoint.createExecutor();
+        log.debug("Using executor {}", executor);
+        try {
+            openConnectionAndChannel();
+            addConsumer();
+        } catch (Exception e) {
+            // Open connection, and start message listener in background
+            Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+            final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L;
+            startConsumerCallable=new StartConsumerCallable(connectionRetryInterval);
+            executor.submit(startConsumerCallable);
+        }
+    }
+
+    /**
+     * If needed, close Connection and Channel
+     */
+    private void closeConnectionAndChannel() throws IOException {
+        if (startConsumerCallable!=null) {
+            startConsumerCallable.stop();
+        }
         if (channel != null) {
             log.debug("Closing channel: {}", channel);
             channel.close();
@@ -84,6 +108,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
             conn.close(closeTimeout);
             conn = null;
         }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        closeConnectionAndChannel();
+
         if (executor != null) {
             if (endpoint != null && endpoint.getCamelContext() != null) {
                 endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -178,4 +208,38 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
     }
 
+    /**
+     * Task in charge of opening connection and adding listener when consumer is started
+     * and broker is not avaiblable.
+     */
+    private class StartConsumerCallable implements Callable<Void> {
+        private final long connectionRetryInterval;
+        private final AtomicBoolean running=new AtomicBoolean(true);
+        public StartConsumerCallable(long connectionRetryInterval) {
+            this.connectionRetryInterval = connectionRetryInterval;
+        }
+        public void stop() {
+            running.set(false);
+            RabbitMQConsumer.this.startConsumerCallable=null;
+        }
+        @Override
+        public Void call() throws Exception {
+            boolean connectionFailed=true;
+            // Reconnection loop
+            while (running.get() && connectionFailed) {
+                try {
+                    openConnectionAndChannel();
+                    connectionFailed=false;
+                } catch (Exception e) {
+                    log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e);
+                    Thread.sleep(connectionRetryInterval);
+                }
+            }
+            if (!connectionFailed) {
+                addConsumer();
+            }
+            stop();
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 646a633..7f87f53 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -29,6 +30,7 @@ import javax.net.ssl.TrustManager;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
@@ -126,6 +128,26 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         }
     }
 
+	/**
+     * If needed, declare Exchange, declare Queue and bind them with Routing Key
+     */
+    public void declareExchangeAndQueue(Channel channel) throws IOException {
+        channel.exchangeDeclare(getExchangeName(),
+                getExchangeType(),
+                isDurable(),
+                isAutoDelete(),
+                new HashMap<String, Object>());
+        if (getQueue()!=null) {
+            // need to make sure the queueDeclare is same with the exchange declare
+            channel.queueDeclare(getQueue(), isDurable(), false,
+                    isAutoDelete(), null);
+            channel.queueBind(
+                    getQueue(),
+                    getExchangeName(),
+                    getRoutingKey() == null ? "" : getRoutingKey());
+        }
+    }
+
     private ConnectionFactory getOrCreateConnectionFactory() {
         if (connectionFactory == null) {
             ConnectionFactory factory = new ConnectionFactory();

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 7763423..03177e2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -45,11 +45,10 @@ public class RabbitMQProducer extends DefaultProducer {
     public RabbitMQEndpoint getEndpoint() {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
-
-    @Override
-    protected void doStart() throws Exception {
-        this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
-
+    /**
+     * Open connection and channel
+     */
+    private void openConnectionAndChannel() throws IOException {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executorService);
         log.debug("Created connection: {}", conn);
@@ -60,7 +59,20 @@ public class RabbitMQProducer extends DefaultProducer {
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected void doStart() throws Exception {
+        this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
+
+        try {
+            openConnectionAndChannel();
+        } catch (IOException e) {
+            log.warn("Failed to create connection", e);
+        }
+    }
+
+    /**
+     * If needed, close Connection and Channel
+     */
+    private void closeConnectionAndChannel() throws IOException {
         if (channel != null) {
             log.debug("Closing channel: {}", channel);
             channel.close();
@@ -71,6 +83,11 @@ public class RabbitMQProducer extends DefaultProducer {
             conn.close(closeTimeout);
             conn = null;
         }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        closeConnectionAndChannel();
         if (executorService != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
             executorService = null;
@@ -95,6 +112,10 @@ public class RabbitMQProducer extends DefaultProducer {
         byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
         AMQP.BasicProperties.Builder properties = buildProperties(exchange);
 
+        if (channel==null) {
+            // Open connection and channel lazily
+            openConnectionAndChannel();
+        }
         channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 11b3675..2db5005 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.rabbitmq;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Date;
 import java.util.HashMap;
@@ -26,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.impl.LongStringHelper;
@@ -132,7 +132,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     }
 
     private ConnectionFactory createConnectionFactory(String uri) {
-        RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); 
+        RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
+		try {
+			endpoint.connect(Executors.newSingleThreadExecutor());
+		} catch(IOException ioExc) {
+			// Doesn't matter if RabbitMQ is not available
+			log.debug("RabbitMQ not available", ioExc);
+		}
         return endpoint.getConnectionFactory();
     }
 
@@ -158,16 +164,16 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     @Test
     public void testCreateConnectionFactoryCustom() throws Exception {
         ConnectionFactory connectionFactory  = createConnectionFactory("rabbitmq:localhost:1234/exchange"
-                                                                      + "?username=userxxx"
-                                                                      + "&password=passxxx"
-                                                                      + "&connectionTimeout=123"
-                                                                      + "&requestedChannelMax=456"
-                                                                      + "&requestedFrameMax=789"
-                                                                      + "&requestedHeartbeat=987"
-                                                                      + "&sslProtocol=true"
-                                                                      + "&automaticRecoveryEnabled=true"
-                                                                      + "&networkRecoveryInterval=654"
-                                                                      + "&topologyRecoveryEnabled=false");
+				+ "?username=userxxx"
+				+ "&password=passxxx"
+				+ "&connectionTimeout=123"
+				+ "&requestedChannelMax=456"
+				+ "&requestedFrameMax=789"
+				+ "&requestedHeartbeat=987"
+				+ "&sslProtocol=true"
+				+ "&automaticRecoveryEnabled=true"
+				+ "&networkRecoveryInterval=654"
+				+ "&topologyRecoveryEnabled=false");
 
         assertEquals("localhost", connectionFactory.getHost());
         assertEquals(1234, connectionFactory.getPort());

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
new file mode 100644
index 0000000..6a63948
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -0,0 +1,77 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
+ * is not avaibable.
+ * <ul>
+ *     <li>Stop the broker</li>
+ *     <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ *     <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * </ul>
+ */
+public class RabbitMQReConnectionIntTest extends CamelTestSupport {
+    private static final String EXCHANGE = "ex3";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
+            "&queue=q3&routingKey=rk3"+
+            "&automaticRecoveryEnabled=true" +
+            "&requestedHeartbeat=1000" +
+            "&connectionTimeout=5000")
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .onException(AlreadyClosedException.class, ConnectException.class)
+                            .maximumRedeliveries(10)
+                            .redeliveryDelay(500L)
+                            .end()
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .to(mockEndpoint);
+            }
+        };
+    }
+    @Test
+    public void testSendEndReceive() throws Exception {
+        int nbMessages=100;
+        int failedMessages=0;
+        for(int i=0;i<nbMessages;i++) {
+            try {
+                directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3");
+            } catch (CamelExecutionException e) {
+                log.debug("Can not send message", e);
+                failedMessages++;
+            }
+            Thread.sleep(500L);
+        }
+        mockEndpoint.assertExchangeReceived(nbMessages-failedMessages);
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+}


[3/4] git commit: Fix and improve Re-connection integration test

Posted by ni...@apache.org.
Fix and improve Re-connection integration test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8029d0c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8029d0c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8029d0c7

Branch: refs/heads/master
Commit: 8029d0c7663504b248f94f7daf36eda3e0b61060
Parents: 1191e19
Author: Gerald Quintana <ge...@zenika.com>
Authored: Tue May 6 22:42:11 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Wed May 7 09:02:21 2014 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    |  6 +--
 .../component/rabbitmq/RabbitMQProducer.java    |  2 +
 .../rabbitmq/RabbitMQReConnectionIntTest.java   | 42 +++++++++++---------
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 0bdc93b..18ed62f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -63,14 +63,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
         log.trace("Creating channel...");
         this.channel = conn.createChannel();
         log.debug("Created channel: {}", channel);
-    }
+
+		getEndpoint().declareExchangeAndQueue(channel);
+	}
 
     /**
      * If needed, create Exchange and Queue, then add message listener
      */
     private void addConsumer() throws IOException {
-        getEndpoint().declareExchangeAndQueue(channel);
-
         channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
                 new RabbitConsumer(this, channel));
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 03177e2..f8596c9 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -56,6 +56,8 @@ public class RabbitMQProducer extends DefaultProducer {
         log.trace("Creating channel...");
         this.channel = conn.createChannel();
         log.debug("Created channel: {}", channel);
+
+		getEndpoint().declareExchangeAndQueue(this.channel);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 6a63948..512aed4 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -14,11 +14,11 @@ import java.util.concurrent.TimeUnit;
  * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
  * is not avaibable.
  * <ul>
- *     <li>Stop the broker</li>
- *     <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
- *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
- *     <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
- *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker</li>
+ * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
  * </ul>
  */
 public class RabbitMQReConnectionIntTest extends CamelTestSupport {
@@ -28,14 +28,17 @@ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
     protected ProducerTemplate directProducer;
 
     @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
-            "&queue=q3&routingKey=rk3"+
+            "&queue=q3&routingKey=rk3" +
             "&automaticRecoveryEnabled=true" +
             "&requestedHeartbeat=1000" +
             "&connectionTimeout=5000")
     private Endpoint rabbitMQEndpoint;
 
-    @EndpointInject(uri = "mock:result")
-    private MockEndpoint mockEndpoint;
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -46,32 +49,35 @@ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
                 from("direct:rabbitMQ")
                         .id("producingRoute")
                         .onException(AlreadyClosedException.class, ConnectException.class)
-                            .maximumRedeliveries(10)
-                            .redeliveryDelay(500L)
-                            .end()
+                        .maximumRedeliveries(10)
+                        .redeliveryDelay(500L)
+                        .end()
                         .log("Sending message")
-                        .inOnly(rabbitMQEndpoint);
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
                 from(rabbitMQEndpoint)
                         .id("consumingRoute")
                         .log("Receiving message")
-                        .to(mockEndpoint);
+                        .to(consumingMockEndpoint);
             }
         };
     }
+
     @Test
     public void testSendEndReceive() throws Exception {
-        int nbMessages=100;
-        int failedMessages=0;
-        for(int i=0;i<nbMessages;i++) {
+        int nbMessages = 50;
+        int failedMessages = 0;
+        for (int i = 0; i < nbMessages; i++) {
             try {
-                directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3");
+                directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, "rk3");
             } catch (CamelExecutionException e) {
                 log.debug("Can not send message", e);
                 failedMessages++;
             }
             Thread.sleep(500L);
         }
-        mockEndpoint.assertExchangeReceived(nbMessages-failedMessages);
+        producingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+        consumingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
     }
 }


[4/4] git commit: CAMEL-6869 merged the patch of Gerald

Posted by ni...@apache.org.
CAMEL-6869 merged the patch of Gerald


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/686d5fcc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/686d5fcc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/686d5fcc

Branch: refs/heads/master
Commit: 686d5fccac3e0308026548b58dfc90c31e41b5c9
Parents: 69b00a3 8029d0c
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Jun 6 10:58:16 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Jun 6 10:58:16 2014 +0800

----------------------------------------------------------------------
 components/camel-rabbitmq/pom.xml               |  17 ++-
 .../component/rabbitmq/RabbitMQConsumer.java    | 114 +++++++++++++----
 .../component/rabbitmq/RabbitMQEndpoint.java    |  36 ++++--
 .../component/rabbitmq/RabbitMQProducer.java    |  35 +++++-
 .../rabbitmq/RabbitMQEndpointTest.java          |  36 +++---
 .../rabbitmq/RabbitMQReConnectionIntTest.java   | 102 ++++++++++++++++
 .../rabbitmq/RabbitMQSpringIntTest.java         | 122 +++++++++++++++++++
 .../rabbitmq/RabbitMQSpringIntTest-context.xml  |  40 ++++++
 8 files changed, 443 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 8a9b624,18ed62f..0f1d85f
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@@ -43,42 -48,56 +48,61 @@@ public class RabbitMQConsumer extends D
      }
  
      @Override
-     protected void doStart() throws Exception {
-         executor = endpoint.createExecutor();
-         log.debug("Using executor {}", executor);
- 
-         conn = endpoint.connect(executor);
-         log.debug("Using conn {}", conn);
 +
-         channel = conn.createChannel();
-         log.debug("Using channel {}", channel);
+     public RabbitMQEndpoint getEndpoint() {
+         return (RabbitMQEndpoint) super.getEndpoint();
+     }
  
+     /**
+      * Open connection and channel
+      */
+     private void openConnectionAndChannel() throws IOException {
+         log.trace("Creating connection...");
+         this.conn = getEndpoint().connect(executor);
+         log.debug("Created connection: {}", conn);
+ 
+         log.trace("Creating channel...");
+         this.channel = conn.createChannel();
+         log.debug("Created channel: {}", channel);
 -
 -		getEndpoint().declareExchangeAndQueue(channel);
 -	}
++        // setup the basicQos
 +        if (endpoint.isPrefetchEnabled()) {
 +            channel.basicQos(endpoint.getPrefetchSize(), 
 +                             endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
 +        }
-         
-         channel.exchangeDeclare(endpoint.getExchangeName(),
-                 endpoint.getExchangeType(),
-                 endpoint.isDurable(),
-                 endpoint.isAutoDelete(),
-                 new HashMap<String, Object>());
- 
-         // need to make sure the queueDeclare is same with the exchange declare
-         channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
-                 endpoint.isAutoDelete(), null);
-         channel.queueBind(
-                 endpoint.getQueue(),
-                 endpoint.getExchangeName(),
-                 endpoint.getRoutingKey() == null ? "" : endpoint
-                         .getRoutingKey());
++        getEndpoint().declareExchangeAndQueue(channel);
++    }
  
+     /**
+      * If needed, create Exchange and Queue, then add message listener
+      */
+     private void addConsumer() throws IOException {
          channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
                  new RabbitConsumer(this, channel));
      }
  
      @Override
-     protected void doStop() throws Exception {
+     protected void doStart() throws Exception {
+         executor = endpoint.createExecutor();
+         log.debug("Using executor {}", executor);
+         try {
+             openConnectionAndChannel();
+             addConsumer();
+         } catch (Exception e) {
+             // Open connection, and start message listener in background
+             Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
 -            final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L;
 -            startConsumerCallable=new StartConsumerCallable(connectionRetryInterval);
++            final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
++            startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+             executor.submit(startConsumerCallable);
+         }
+     }
+ 
+     /**
+      * If needed, close Connection and Channel
+      */
+     private void closeConnectionAndChannel() throws IOException {
 -        if (startConsumerCallable!=null) {
++        if (startConsumerCallable != null) {
+             startConsumerCallable.stop();
+         }
          if (channel != null) {
              log.debug("Closing channel: {}", channel);
              channel.close();
@@@ -183,4 -208,38 +213,38 @@@
  
      }
  
+     /**
+      * Task in charge of opening connection and adding listener when consumer is started
 -     * and broker is not avaiblable.
++     * and broker is not available.
+      */
+     private class StartConsumerCallable implements Callable<Void> {
+         private final long connectionRetryInterval;
 -        private final AtomicBoolean running=new AtomicBoolean(true);
++        private final AtomicBoolean running = new AtomicBoolean(true);
+         public StartConsumerCallable(long connectionRetryInterval) {
+             this.connectionRetryInterval = connectionRetryInterval;
+         }
+         public void stop() {
+             running.set(false);
 -            RabbitMQConsumer.this.startConsumerCallable=null;
++            RabbitMQConsumer.this.startConsumerCallable = null;
+         }
+         @Override
+         public Void call() throws Exception {
 -            boolean connectionFailed=true;
++            boolean connectionFailed = true;
+             // Reconnection loop
+             while (running.get() && connectionFailed) {
+                 try {
+                     openConnectionAndChannel();
 -                    connectionFailed=false;
++                    connectionFailed = false;
+                 } catch (Exception e) {
 -                    log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e);
++                    log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
+                     Thread.sleep(connectionRetryInterval);
+                 }
+             }
+             if (!connectionFailed) {
+                 addConsumer();
+             }
+             stop();
+             return null;
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 9400935,7f87f53..e475819
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@@ -71,15 -73,7 +73,15 @@@ public class RabbitMQEndpoint extends D
      private Boolean automaticRecoveryEnabled;
      private Integer networkRecoveryInterval;
      private Boolean topologyRecoveryEnabled;
-     
+ 
 +    //If it is true, prefetchSize, prefetchCount, prefetchGlobal will be used for basicOqs before starting RabbitMQConsumer
 +    private boolean prefetchEnabled;
 +    //Default in RabbitMq is 0.
 +    private int prefetchSize;
 +    private int prefetchCount;
 +    //Default value in RabbitMQ is false.
 +    private boolean prefetchGlobal;
-     
++
      public RabbitMQEndpoint() {
      }
  
@@@ -134,7 -128,27 +136,27 @@@
          }
      }
  
-     protected ConnectionFactory getOrCreateConnectionFactory() {
 -	/**
++    /**
+      * If needed, declare Exchange, declare Queue and bind them with Routing Key
+      */
+     public void declareExchangeAndQueue(Channel channel) throws IOException {
+         channel.exchangeDeclare(getExchangeName(),
+                 getExchangeType(),
+                 isDurable(),
+                 isAutoDelete(),
+                 new HashMap<String, Object>());
 -        if (getQueue()!=null) {
++        if (getQueue() != null) {
+             // need to make sure the queueDeclare is same with the exchange declare
+             channel.queueDeclare(getQueue(), isDurable(), false,
+                     isAutoDelete(), null);
+             channel.queueBind(
+                     getQueue(),
+                     getExchangeName(),
+                     getRoutingKey() == null ? "" : getRoutingKey());
+         }
+     }
+ 
+     private ConnectionFactory getOrCreateConnectionFactory() {
          if (connectionFactory == null) {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setUsername(getUsername());
@@@ -299,22 -313,22 +321,22 @@@
      public void setRoutingKey(String routingKey) {
          this.routingKey = routingKey;
      }
--    
++
      public void setBridgeEndpoint(boolean bridgeEndpoint) {
          this.bridgeEndpoint = bridgeEndpoint;
      }
--    
++
      public boolean isBridgeEndpoint() {
          return bridgeEndpoint;
      }
--    
++
      public void setAddresses(String addresses) {
          Address[] addressArray = Address.parseAddresses(addresses);
          if (addressArray.length > 0) {
              this.addresses = addressArray;
          }
      }
--    
++
      public Address[] getAddresses() {
          return addresses;
      }

http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 7763423,f8596c9..2e22d3f
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@@ -57,6 -56,8 +56,8 @@@ public class RabbitMQProducer extends D
          log.trace("Creating channel...");
          this.channel = conn.createChannel();
          log.debug("Created channel: {}", channel);
+ 
 -		getEndpoint().declareExchangeAndQueue(this.channel);
++        getEndpoint().declareExchangeAndQueue(this.channel);
      }
  
      @Override
@@@ -95,6 -114,10 +114,10 @@@
          byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
          AMQP.BasicProperties.Builder properties = buildProperties(exchange);
  
 -        if (channel==null) {
++        if (channel == null) {
+             // Open connection and channel lazily
+             openConnectionAndChannel();
+         }
          channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
      }
  

http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 86a7bcc,2db5005..afae40d
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@@ -109,7 -109,7 +109,7 @@@ public class RabbitMQEndpointTest exten
          ThreadPoolExecutor executor = assertIsInstanceOf(ThreadPoolExecutor.class,  endpoint.createExecutor());
          assertEquals(20, executor.getCorePoolSize());
      }
--    
++
      @Test
      public void createEndpointWithAutoAckDisabled() throws Exception {
          RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?autoAck=false", RabbitMQEndpoint.class);
@@@ -122,7 -122,7 +122,7 @@@
  
          assertTrue(endpoint.isSingleton());
      }
--    
++
      @Test
      public void brokerEndpointAddressesSettings() throws Exception {
          RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class);
@@@ -133,7 -133,13 +133,13 @@@
  
      private ConnectionFactory createConnectionFactory(String uri) {
          RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
-         return endpoint.getOrCreateConnectionFactory();
 -		try {
 -			endpoint.connect(Executors.newSingleThreadExecutor());
 -		} catch(IOException ioExc) {
 -			// Doesn't matter if RabbitMQ is not available
 -			log.debug("RabbitMQ not available", ioExc);
 -		}
++        try {
++            endpoint.connect(Executors.newSingleThreadExecutor());
++        } catch (IOException ioExc) {
++            // Doesn't matter if RabbitMQ is not available
++            log.debug("RabbitMQ not available", ioExc);
++        }
+         return endpoint.getConnectionFactory();
      }
  
      @Test
@@@ -157,17 -163,17 +163,17 @@@
  
      @Test
      public void testCreateConnectionFactoryCustom() throws Exception {
--        ConnectionFactory connectionFactory  = createConnectionFactory("rabbitmq:localhost:1234/exchange"
-                                                                       + "?username=userxxx"
-                                                                       + "&password=passxxx"
-                                                                       + "&connectionTimeout=123"
-                                                                       + "&requestedChannelMax=456"
-                                                                       + "&requestedFrameMax=789"
-                                                                       + "&requestedHeartbeat=987"
-                                                                       + "&sslProtocol=true"
-                                                                       + "&automaticRecoveryEnabled=true"
-                                                                       + "&networkRecoveryInterval=654"
-                                                                       + "&topologyRecoveryEnabled=false");
 -				+ "?username=userxxx"
 -				+ "&password=passxxx"
 -				+ "&connectionTimeout=123"
 -				+ "&requestedChannelMax=456"
 -				+ "&requestedFrameMax=789"
 -				+ "&requestedHeartbeat=987"
 -				+ "&sslProtocol=true"
 -				+ "&automaticRecoveryEnabled=true"
 -				+ "&networkRecoveryInterval=654"
 -				+ "&topologyRecoveryEnabled=false");
++        ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"
++            + "?username=userxxx"
++            + "&password=passxxx"
++            + "&connectionTimeout=123"
++            + "&requestedChannelMax=456"
++            + "&requestedFrameMax=789"
++            + "&requestedHeartbeat=987"
++            + "&sslProtocol=true"
++            + "&automaticRecoveryEnabled=true"
++            + "&networkRecoveryInterval=654"
++            + "&topologyRecoveryEnabled=false");
  
          assertEquals("localhost", connectionFactory.getHost());
          assertEquals(1234, connectionFactory.getPort());

http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 0000000,512aed4..302440c
mode 000000,100644..100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@@ -1,0 -1,83 +1,102 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+ package org.apache.camel.component.rabbitmq;
+ 
++import java.net.ConnectException;
++import java.util.concurrent.TimeUnit;
++
+ import com.rabbitmq.client.AlreadyClosedException;
 -import org.apache.camel.*;
++
++import org.apache.camel.CamelExecutionException;
++import org.apache.camel.Endpoint;
++import org.apache.camel.EndpointInject;
++import org.apache.camel.Produce;
++import org.apache.camel.ProducerTemplate;
+ import org.apache.camel.builder.RouteBuilder;
+ import org.apache.camel.component.mock.MockEndpoint;
+ import org.apache.camel.test.junit4.CamelTestSupport;
+ import org.junit.Test;
+ 
 -import java.net.ConnectException;
 -import java.util.concurrent.TimeUnit;
 -
+ /**
+  * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
+  * is not avaibable.
+  * <ul>
+  * <li>Stop the broker</li>
+  * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+  * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+  * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+  * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+  * </ul>
+  */
+ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
+     private static final String EXCHANGE = "ex3";
+ 
+     @Produce(uri = "direct:rabbitMQ")
+     protected ProducerTemplate directProducer;
+ 
 -    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
 -            "&queue=q3&routingKey=rk3" +
 -            "&automaticRecoveryEnabled=true" +
 -            "&requestedHeartbeat=1000" +
 -            "&connectionTimeout=5000")
++    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest"
++                          + "&queue=q3&routingKey=rk3" + "&automaticRecoveryEnabled=true"
++                          + "&requestedHeartbeat=1000" + "&connectionTimeout=5000")
+     private Endpoint rabbitMQEndpoint;
+ 
+     @EndpointInject(uri = "mock:producing")
+     private MockEndpoint producingMockEndpoint;
+ 
+     @EndpointInject(uri = "mock:consuming")
+     private MockEndpoint consumingMockEndpoint;
+ 
+     @Override
+     protected RouteBuilder createRouteBuilder() throws Exception {
+         return new RouteBuilder() {
+ 
+             @Override
+             public void configure() throws Exception {
+                 from("direct:rabbitMQ")
+                         .id("producingRoute")
+                         .onException(AlreadyClosedException.class, ConnectException.class)
+                         .maximumRedeliveries(10)
+                         .redeliveryDelay(500L)
+                         .end()
+                         .log("Sending message")
+                         .inOnly(rabbitMQEndpoint)
+                         .to(producingMockEndpoint);
+                 from(rabbitMQEndpoint)
+                         .id("consumingRoute")
+                         .log("Receiving message")
+                         .to(consumingMockEndpoint);
+             }
+         };
+     }
+ 
+     @Test
+     public void testSendEndReceive() throws Exception {
+         int nbMessages = 50;
+         int failedMessages = 0;
+         for (int i = 0; i < nbMessages; i++) {
+             try {
+                 directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, "rk3");
+             } catch (CamelExecutionException e) {
+                 log.debug("Can not send message", e);
+                 failedMessages++;
+             }
+             Thread.sleep(500L);
+         }
+         producingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+         consumingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index 0000000,baa4bde..6119082
mode 000000,100644..100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@@ -1,0 -1,102 +1,122 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+ package org.apache.camel.component.rabbitmq;
+ 
 -import com.rabbitmq.client.*;
++import java.io.IOException;
++
++import com.rabbitmq.client.AMQP;
++import com.rabbitmq.client.Channel;
++import com.rabbitmq.client.Connection;
++import com.rabbitmq.client.ConnectionFactory;
++import com.rabbitmq.client.DefaultConsumer;
++import com.rabbitmq.client.Envelope;
++
+ import org.apache.camel.Produce;
+ import org.apache.camel.ProducerTemplate;
+ import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.test.context.ContextConfiguration;
 -
 -import java.io.IOException;
 -
+ import static org.junit.Assert.assertEquals;
 -
+ /**
+  * Test RabbitMQ component with Spring DSL
+  */
+ @RunWith(CamelSpringJUnit4ClassRunner.class)
+ @ContextConfiguration("RabbitMQSpringIntTest-context.xml")
+ public class RabbitMQSpringIntTest {
+     @Produce(uri = "direct:rabbitMQ")
+     protected ProducerTemplate template;
+     @Autowired
+     private ConnectionFactory connectionFactory;
+     private Connection connection;
+     private Channel channel;
+ 
+     private Connection openConnection() throws IOException {
+         if (connection == null) {
+             connection = connectionFactory.newConnection();
+         }
+         return connection;
+     }
+ 
+     private Channel openChannel() throws IOException {
+         if (channel == null) {
+             channel = openConnection().createChannel();
+         }
+         return channel;
+     }
+ 
+     @Before
+     public void bindQueueExchange() throws IOException {
+         openChannel();
+         channel.exchangeDeclare("ex2", "direct", true, false, null);
+         channel.queueDeclare("q2", true, false, false, null);
+         channel.queueBind("q2", "ex2", "rk2");
+     }
+ 
+     @After
+     public void closeConnection() {
+         if (channel != null) {
+             try {
+                 channel.close();
+             } catch (IOException e) {
+             }
+         }
+         if (connection != null) {
+             try {
+                 connection.close();
+             } catch (IOException e) {
+             }
+         }
+     }
+ 
 -    private static class LastDeliveryConsumer extends DefaultConsumer {
++    private static final class LastDeliveryConsumer extends DefaultConsumer {
+         private byte[] lastBody;
+ 
+         private LastDeliveryConsumer(Channel channel) {
+             super(channel);
+         }
+ 
+         @Override
+         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+             lastBody = body;
+             super.handleDelivery(consumerTag, envelope, properties, body);
+         }
+ 
+         public byte[] getLastBody() {
+             return lastBody;
+         }
+     }
+ 
+     @Test
+     public void testSendCsutomConnectionFactory() throws Exception {
+         String body = "Hello Rabbit";
+         template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+ 
+         openChannel();
+         LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+         channel.basicConsume("q2", true, consumer);
+         int i = 10;
+         while (consumer.getLastBody() == null && i > 0) {
+             Thread.sleep(1000L);
+             i--;
+         }
+         assertEquals(body, new String(consumer.getLastBody()));
+     }
+ }