You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/30 17:15:05 UTC

[1/2] git commit: CAMEL-7556 Multiple concurrent consumer threads

Repository: camel
Updated Branches:
  refs/heads/master 6faf7f403 -> 28a8d00d3


CAMEL-7556 Multiple concurrent consumer threads


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3c4f8331
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3c4f8331
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3c4f8331

Branch: refs/heads/master
Commit: 3c4f8331bff02b5098187475e5de3f419d7dd718
Parents: 6faf7f4
Author: Gerald Quintana <ge...@zenika.com>
Authored: Sun Jun 29 18:12:42 2014 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jun 30 21:39:05 2014 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 106 +++++++++++++------
 .../component/rabbitmq/RabbitMQEndpoint.java    |  15 ++-
 .../component/rabbitmq/RabbitMQLoadIntTest.java | 101 ++++++++++++++++++
 3 files changed, 186 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 0f1d85f..91da43b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +27,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -32,17 +35,18 @@ import org.apache.camel.impl.DefaultConsumer;
 public class RabbitMQConsumer extends DefaultConsumer {
     ExecutorService executor;
     Connection conn;
-    Channel channel;
-
     private int closeTimeout = 30 * 1000;
-    
     private final RabbitMQEndpoint endpoint;
     /**
      * Task in charge of starting consumer
      */
     private StartConsumerCallable startConsumerCallable;
+	/**
+	 * Running consumers
+	 */
+	private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
 
-    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+	public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
@@ -54,39 +58,58 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * Open connection and channel
+     * Open connection
      */
-    private void openConnectionAndChannel() throws IOException {
+    private void openConnection() throws IOException {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executor);
         log.debug("Created connection: {}", conn);
-
-        log.trace("Creating channel...");
-        this.channel = conn.createChannel();
-        log.debug("Created channel: {}", channel);
-        // setup the basicQos
-        if (endpoint.isPrefetchEnabled()) {
-            channel.basicQos(endpoint.getPrefetchSize(), 
-                             endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
-        }
-        getEndpoint().declareExchangeAndQueue(channel);
     }
 
-    /**
-     * If needed, create Exchange and Queue, then add message listener
+	/**
+	 * Open channel
+	 */
+	private Channel openChannel() throws IOException {
+		log.trace("Creating channel...");
+		Channel channel = conn.createChannel();
+		log.debug("Created channel: {}", channel);
+		// setup the basicQos
+		if (endpoint.isPrefetchEnabled()) {
+			channel.basicQos(endpoint.getPrefetchSize(),
+					endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+		}
+		return channel;
+	}
+	/**
+	 * Add a consummer thread for given channel
+	 */
+	private void startConsumers() throws IOException {
+		// First channel used to declare Exchange and Queue
+		Channel channel=openChannel();
+		endpoint.declareExchangeAndQueue(channel);
+		startConsumer(channel);
+		// Other channels
+		for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
+			channel=openChannel();
+			startConsumer(channel);
+		}
+	}
+	/**
+     * Add a consummer thread for given channel
      */
-    private void addConsumer() throws IOException {
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
-                new RabbitConsumer(this, channel));
-    }
+    private void startConsumer(Channel channel) throws IOException {
+		RabbitConsumer consumer = new RabbitConsumer(this, channel);
+		consumer.start();
+		this.consumers.add(consumer);
+	}
 
     @Override
     protected void doStart() throws Exception {
         executor = endpoint.createExecutor();
         log.debug("Using executor {}", executor);
         try {
-            openConnectionAndChannel();
-            addConsumer();
+            openConnection();
+            startConsumers();
         } catch (Exception e) {
             // Open connection, and start message listener in background
             Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
@@ -97,17 +120,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * If needed, close Connection and Channel
+     * If needed, close Connection and Channels
      */
     private void closeConnectionAndChannel() throws IOException {
         if (startConsumerCallable != null) {
             startConsumerCallable.stop();
         }
-        if (channel != null) {
-            log.debug("Closing channel: {}", channel);
-            channel.close();
-            channel = null;
-        }
+		for(RabbitConsumer consumer: this.consumers) {
+			consumer.stop();
+		}
+		this.consumers.clear();
         if (conn != null) {
             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
             conn.close(closeTimeout);
@@ -133,7 +155,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
         private final RabbitMQConsumer consumer;
         private final Channel channel;
-
+		private String tag;
         /**
          * Constructs a new instance and records its association to the
          * passed-in channel.
@@ -211,7 +233,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
             }
         }
 
-    }
+		/**
+		 * Bind consumer to channel
+		 */
+		public void start() throws IOException {
+			tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+		}
+
+		/**
+		 * Unbind consumer from channel
+		 */
+		public void stop() throws IOException{
+			if (tag!=null) {
+				channel.basicCancel(tag);
+			}
+			channel.close();
+		}
+	}
 
     /**
      * Task in charge of opening connection and adding listener when consumer is started
@@ -233,7 +271,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
             // Reconnection loop
             while (running.get() && connectionFailed) {
                 try {
-                    openConnectionAndChannel();
+                    openConnection();
                     connectionFailed = false;
                 } catch (Exception e) {
                     log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
@@ -241,7 +279,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 }
             }
             if (!connectionFailed) {
-                addConsumer();
+                startConsumers();
             }
             stop();
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index e475819..4ee6a7c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,8 +81,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private int prefetchCount;
     //Default value in RabbitMQ is false.
     private boolean prefetchGlobal;
-
-    public RabbitMQEndpoint() {
+	/**
+	 * Number of concurrent consumer threads
+	 */
+	private int concurrentConsumers = 1;
+	public RabbitMQEndpoint() {
     }
 
     public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException {
@@ -460,4 +463,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public boolean isPrefetchGlobal() {
         return prefetchGlobal;
     }
+
+	public int getConcurrentConsumers() {
+		return concurrentConsumers;
+	}
+
+	public void setConcurrentConsumers(int concurrentConsumers) {
+		this.concurrentConsumers = concurrentConsumers;
+	}
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
new file mode 100644
index 0000000..adf1367
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
+ * consumers
+ */
+public class RabbitMQLoadIntTest extends CamelTestSupport {
+	private static final int PRODUCER_COUNT=10;
+	private static final int CONSUMER_COUNT=10;
+	private static final int MESSAGE_COUNT=100;
+	public static final String ROUTING_KEY = "rk4";
+	@Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
+                          + "&queue=q4&routingKey="+ROUTING_KEY
+			+"&threadPoolSize="+(CONSUMER_COUNT+5)
+			+"&concurrentConsumers="+CONSUMER_COUNT)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .to(consumingMockEndpoint);
+            }
+        };
+    }
+
+    @Test
+    public void testSendEndReceive() throws Exception {
+		// Start producers
+		ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
+		List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
+		for(int i = 0 ; i < PRODUCER_COUNT; i++) {
+			futures.add(executorService.submit(new Runnable() {
+				@Override
+				public void run() {
+					for (int i = 0; i < MESSAGE_COUNT; i++) {
+						directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
+					}
+				}
+			}));
+		}
+		// Wait for producers to end
+		for(Future future:futures) {
+			future.get(5, TimeUnit.SECONDS);
+		}
+		// Check message count
+        producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+        consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+}


[2/2] git commit: CAMEL-7556 Fixed bunch of CS errors

Posted by ni...@apache.org.
CAMEL-7556 Fixed bunch of CS errors


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28a8d00d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28a8d00d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28a8d00d

Branch: refs/heads/master
Commit: 28a8d00d3b37543f91e60ee9b5b956ca4f90e000
Parents: 3c4f833
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Jun 30 21:43:28 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jun 30 21:43:28 2014 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 120 ++++++++++---------
 .../component/rabbitmq/RabbitMQEndpoint.java    |  23 ++--
 .../component/rabbitmq/RabbitMQLoadIntTest.java |  70 +++++------
 3 files changed, 109 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 91da43b..2539b14 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -41,12 +41,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
      * Task in charge of starting consumer
      */
     private StartConsumerCallable startConsumerCallable;
-	/**
-	 * Running consumers
-	 */
-	private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
+    /**
+     * Running consumers
+     */
+    private final List<RabbitConsumer> consumers = new ArrayList<RabbitConsumer>();
 
-	public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
@@ -66,42 +66,44 @@ public class RabbitMQConsumer extends DefaultConsumer {
         log.debug("Created connection: {}", conn);
     }
 
-	/**
-	 * Open channel
-	 */
-	private Channel openChannel() throws IOException {
-		log.trace("Creating channel...");
-		Channel channel = conn.createChannel();
-		log.debug("Created channel: {}", channel);
-		// setup the basicQos
-		if (endpoint.isPrefetchEnabled()) {
-			channel.basicQos(endpoint.getPrefetchSize(),
-					endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
-		}
-		return channel;
-	}
-	/**
-	 * Add a consummer thread for given channel
-	 */
-	private void startConsumers() throws IOException {
-		// First channel used to declare Exchange and Queue
-		Channel channel=openChannel();
-		endpoint.declareExchangeAndQueue(channel);
-		startConsumer(channel);
-		// Other channels
-		for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
-			channel=openChannel();
-			startConsumer(channel);
-		}
-	}
-	/**
+    /**
+     * Open channel
+     */
+    private Channel openChannel() throws IOException {
+        log.trace("Creating channel...");
+        Channel channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+        // setup the basicQos
+        if (endpoint.isPrefetchEnabled()) {
+            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
+                             endpoint.isPrefetchGlobal());
+        }
+        return channel;
+    }
+
+    /**
+     * Add a consummer thread for given channel
+     */
+    private void startConsumers() throws IOException {
+        // First channel used to declare Exchange and Queue
+        Channel channel = openChannel();
+        endpoint.declareExchangeAndQueue(channel);
+        startConsumer(channel);
+        // Other channels
+        for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) {
+            channel = openChannel();
+            startConsumer(channel);
+        }
+    }
+
+    /**
      * Add a consummer thread for given channel
      */
     private void startConsumer(Channel channel) throws IOException {
-		RabbitConsumer consumer = new RabbitConsumer(this, channel);
-		consumer.start();
-		this.consumers.add(consumer);
-	}
+        RabbitConsumer consumer = new RabbitConsumer(this, channel);
+        consumer.start();
+        this.consumers.add(consumer);
+    }
 
     @Override
     protected void doStart() throws Exception {
@@ -126,10 +128,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
         if (startConsumerCallable != null) {
             startConsumerCallable.stop();
         }
-		for(RabbitConsumer consumer: this.consumers) {
-			consumer.stop();
-		}
-		this.consumers.clear();
+        for (RabbitConsumer consumer : this.consumers) {
+            consumer.stop();
+        }
+        this.consumers.clear();
         if (conn != null) {
             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
             conn.close(closeTimeout);
@@ -155,7 +157,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
         private final RabbitMQConsumer consumer;
         private final Channel channel;
-		private String tag;
+        private String tag;
         /**
          * Constructs a new instance and records its association to the
          * passed-in channel.
@@ -233,23 +235,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
             }
         }
 
-		/**
-		 * Bind consumer to channel
-		 */
-		public void start() throws IOException {
-			tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
-		}
+        /**
+         * Bind consumer to channel
+         */
+        public void start() throws IOException {
+            tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+        }
 
-		/**
-		 * Unbind consumer from channel
-		 */
-		public void stop() throws IOException{
-			if (tag!=null) {
-				channel.basicCancel(tag);
-			}
-			channel.close();
-		}
-	}
+        /**
+         * Unbind consumer from channel
+         */
+        public void stop() throws IOException {
+            if (tag != null) {
+                channel.basicCancel(tag);
+            }
+            channel.close();
+        }
+    }
 
     /**
      * Task in charge of opening connection and adding listener when consumer is started

http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 4ee6a7c..d0484b1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,11 +81,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private int prefetchCount;
     //Default value in RabbitMQ is false.
     private boolean prefetchGlobal;
-	/**
-	 * Number of concurrent consumer threads
-	 */
-	private int concurrentConsumers = 1;
-	public RabbitMQEndpoint() {
+    /**
+     * Number of concurrent consumer threads
+     */
+    private int concurrentConsumers = 1;
+
+    public RabbitMQEndpoint() {
     }
 
     public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException {
@@ -464,11 +465,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         return prefetchGlobal;
     }
 
-	public int getConcurrentConsumers() {
-		return concurrentConsumers;
-	}
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
 
-	public void setConcurrentConsumers(int concurrentConsumers) {
-		this.concurrentConsumers = concurrentConsumers;
-	}
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
index adf1367..4e4e52b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -16,14 +16,6 @@
  */
 package org.apache.camel.component.rabbitmq;
 
-import com.rabbitmq.client.AlreadyClosedException;
-import org.apache.camel.*;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -31,22 +23,31 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
 /**
  * Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
  * consumers
  */
 public class RabbitMQLoadIntTest extends CamelTestSupport {
-	private static final int PRODUCER_COUNT=10;
-	private static final int CONSUMER_COUNT=10;
-	private static final int MESSAGE_COUNT=100;
-	public static final String ROUTING_KEY = "rk4";
-	@Produce(uri = "direct:rabbitMQ")
+    public static final String ROUTING_KEY = "rk4";
+    private static final int PRODUCER_COUNT = 10;
+    private static final int CONSUMER_COUNT = 10;
+    private static final int MESSAGE_COUNT = 100;
+    
+    @Produce(uri = "direct:rabbitMQ")
     protected ProducerTemplate directProducer;
 
     @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
-                          + "&queue=q4&routingKey="+ROUTING_KEY
-			+"&threadPoolSize="+(CONSUMER_COUNT+5)
-			+"&concurrentConsumers="+CONSUMER_COUNT)
+                          + "&queue=q4&routingKey=" + ROUTING_KEY + "&threadPoolSize=" + (CONSUMER_COUNT + 5)
+                          + "&concurrentConsumers=" + CONSUMER_COUNT)
     private Endpoint rabbitMQEndpoint;
 
     @EndpointInject(uri = "mock:producing")
@@ -76,24 +77,25 @@ public class RabbitMQLoadIntTest extends CamelTestSupport {
 
     @Test
     public void testSendEndReceive() throws Exception {
-		// Start producers
-		ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
-		List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
-		for(int i = 0 ; i < PRODUCER_COUNT; i++) {
-			futures.add(executorService.submit(new Runnable() {
-				@Override
-				public void run() {
-					for (int i = 0; i < MESSAGE_COUNT; i++) {
-						directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
-					}
-				}
-			}));
-		}
-		// Wait for producers to end
-		for(Future future:futures) {
-			future.get(5, TimeUnit.SECONDS);
-		}
-		// Check message count
+        // Start producers
+        ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_COUNT);
+        List<Future> futures = new ArrayList<Future>(PRODUCER_COUNT);
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            futures.add(executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    for (int i = 0; i < MESSAGE_COUNT; i++) {
+                        directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY,
+                                                         ROUTING_KEY);
+                    }
+                }
+            }));
+        }
+        // Wait for producers to end
+        for (Future future : futures) {
+            future.get(5, TimeUnit.SECONDS);
+        }
+        // Check message count
         producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
         consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
         assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);