You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/30 17:15:05 UTC
[1/2] git commit: CAMEL-7556 Multiple concurrent consumer threads
Repository: camel
Updated Branches:
refs/heads/master 6faf7f403 -> 28a8d00d3
CAMEL-7556 Multiple concurrent consumer threads
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3c4f8331
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3c4f8331
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3c4f8331
Branch: refs/heads/master
Commit: 3c4f8331bff02b5098187475e5de3f419d7dd718
Parents: 6faf7f4
Author: Gerald Quintana <ge...@zenika.com>
Authored: Sun Jun 29 18:12:42 2014 +0200
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jun 30 21:39:05 2014 +0800
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 106 +++++++++++++------
.../component/rabbitmq/RabbitMQEndpoint.java | 15 ++-
.../component/rabbitmq/RabbitMQLoadIntTest.java | 101 ++++++++++++++++++
3 files changed, 186 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 0f1d85f..91da43b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +27,7 @@ import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -32,17 +35,18 @@ import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
ExecutorService executor;
Connection conn;
- Channel channel;
-
private int closeTimeout = 30 * 1000;
-
private final RabbitMQEndpoint endpoint;
/**
* Task in charge of starting consumer
*/
private StartConsumerCallable startConsumerCallable;
+ /**
+ * Running consumers
+ */
+ private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
- public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+ public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@@ -54,39 +58,58 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * Open connection and channel
+ * Open connection
*/
- private void openConnectionAndChannel() throws IOException {
+ private void openConnection() throws IOException {
log.trace("Creating connection...");
this.conn = getEndpoint().connect(executor);
log.debug("Created connection: {}", conn);
-
- log.trace("Creating channel...");
- this.channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
- // setup the basicQos
- if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(),
- endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
- }
- getEndpoint().declareExchangeAndQueue(channel);
}
- /**
- * If needed, create Exchange and Queue, then add message listener
+ /**
+ * Open channel
+ */
+ private Channel openChannel() throws IOException {
+ log.trace("Creating channel...");
+ Channel channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
+ // setup the basicQos
+ if (endpoint.isPrefetchEnabled()) {
+ channel.basicQos(endpoint.getPrefetchSize(),
+ endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+ }
+ return channel;
+ }
+ /**
+ * Add a consummer thread for given channel
+ */
+ private void startConsumers() throws IOException {
+ // First channel used to declare Exchange and Queue
+ Channel channel=openChannel();
+ endpoint.declareExchangeAndQueue(channel);
+ startConsumer(channel);
+ // Other channels
+ for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
+ channel=openChannel();
+ startConsumer(channel);
+ }
+ }
+ /**
+ * Add a consummer thread for given channel
*/
- private void addConsumer() throws IOException {
- channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
- new RabbitConsumer(this, channel));
- }
+ private void startConsumer(Channel channel) throws IOException {
+ RabbitConsumer consumer = new RabbitConsumer(this, channel);
+ consumer.start();
+ this.consumers.add(consumer);
+ }
@Override
protected void doStart() throws Exception {
executor = endpoint.createExecutor();
log.debug("Using executor {}", executor);
try {
- openConnectionAndChannel();
- addConsumer();
+ openConnection();
+ startConsumers();
} catch (Exception e) {
// Open connection, and start message listener in background
Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
@@ -97,17 +120,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * If needed, close Connection and Channel
+ * If needed, close Connection and Channels
*/
private void closeConnectionAndChannel() throws IOException {
if (startConsumerCallable != null) {
startConsumerCallable.stop();
}
- if (channel != null) {
- log.debug("Closing channel: {}", channel);
- channel.close();
- channel = null;
- }
+ for(RabbitConsumer consumer: this.consumers) {
+ consumer.stop();
+ }
+ this.consumers.clear();
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
conn.close(closeTimeout);
@@ -133,7 +155,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final RabbitMQConsumer consumer;
private final Channel channel;
-
+ private String tag;
/**
* Constructs a new instance and records its association to the
* passed-in channel.
@@ -211,7 +233,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
}
- }
+ /**
+ * Bind consumer to channel
+ */
+ public void start() throws IOException {
+ tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+ }
+
+ /**
+ * Unbind consumer from channel
+ */
+ public void stop() throws IOException{
+ if (tag!=null) {
+ channel.basicCancel(tag);
+ }
+ channel.close();
+ }
+ }
/**
* Task in charge of opening connection and adding listener when consumer is started
@@ -233,7 +271,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
// Reconnection loop
while (running.get() && connectionFailed) {
try {
- openConnectionAndChannel();
+ openConnection();
connectionFailed = false;
} catch (Exception e) {
log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
@@ -241,7 +279,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
}
if (!connectionFailed) {
- addConsumer();
+ startConsumers();
}
stop();
return null;
http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index e475819..4ee6a7c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,8 +81,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private int prefetchCount;
//Default value in RabbitMQ is false.
private boolean prefetchGlobal;
-
- public RabbitMQEndpoint() {
+ /**
+ * Number of concurrent consumer threads
+ */
+ private int concurrentConsumers = 1;
+ public RabbitMQEndpoint() {
}
public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException {
@@ -460,4 +463,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public boolean isPrefetchGlobal() {
return prefetchGlobal;
}
+
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
+
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
new file mode 100644
index 0000000..adf1367
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
+ * consumers
+ */
+public class RabbitMQLoadIntTest extends CamelTestSupport {
+ private static final int PRODUCER_COUNT=10;
+ private static final int CONSUMER_COUNT=10;
+ private static final int MESSAGE_COUNT=100;
+ public static final String ROUTING_KEY = "rk4";
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directProducer;
+
+ @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
+ + "&queue=q4&routingKey="+ROUTING_KEY
+ +"&threadPoolSize="+(CONSUMER_COUNT+5)
+ +"&concurrentConsumers="+CONSUMER_COUNT)
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .log("Sending message")
+ .inOnly(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message")
+ .to(consumingMockEndpoint);
+ }
+ };
+ }
+
+ @Test
+ public void testSendEndReceive() throws Exception {
+ // Start producers
+ ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
+ List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
+ for(int i = 0 ; i < PRODUCER_COUNT; i++) {
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
+ }
+ }
+ }));
+ }
+ // Wait for producers to end
+ for(Future future:futures) {
+ future.get(5, TimeUnit.SECONDS);
+ }
+ // Check message count
+ producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+ consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+ assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+ }
+}
[2/2] git commit: CAMEL-7556 Fixed bunch of CS errors
Posted by ni...@apache.org.
CAMEL-7556 Fixed bunch of CS errors
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28a8d00d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28a8d00d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28a8d00d
Branch: refs/heads/master
Commit: 28a8d00d3b37543f91e60ee9b5b956ca4f90e000
Parents: 3c4f833
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Jun 30 21:43:28 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jun 30 21:43:28 2014 +0800
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 120 ++++++++++---------
.../component/rabbitmq/RabbitMQEndpoint.java | 23 ++--
.../component/rabbitmq/RabbitMQLoadIntTest.java | 70 +++++------
3 files changed, 109 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 91da43b..2539b14 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -41,12 +41,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
* Task in charge of starting consumer
*/
private StartConsumerCallable startConsumerCallable;
- /**
- * Running consumers
- */
- private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
+ /**
+ * Running consumers
+ */
+ private final List<RabbitConsumer> consumers = new ArrayList<RabbitConsumer>();
- public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+ public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@@ -66,42 +66,44 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.debug("Created connection: {}", conn);
}
- /**
- * Open channel
- */
- private Channel openChannel() throws IOException {
- log.trace("Creating channel...");
- Channel channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
- // setup the basicQos
- if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(),
- endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
- }
- return channel;
- }
- /**
- * Add a consummer thread for given channel
- */
- private void startConsumers() throws IOException {
- // First channel used to declare Exchange and Queue
- Channel channel=openChannel();
- endpoint.declareExchangeAndQueue(channel);
- startConsumer(channel);
- // Other channels
- for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
- channel=openChannel();
- startConsumer(channel);
- }
- }
- /**
+ /**
+ * Open channel
+ */
+ private Channel openChannel() throws IOException {
+ log.trace("Creating channel...");
+ Channel channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
+ // setup the basicQos
+ if (endpoint.isPrefetchEnabled()) {
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
+ endpoint.isPrefetchGlobal());
+ }
+ return channel;
+ }
+
+ /**
+ * Add a consummer thread for given channel
+ */
+ private void startConsumers() throws IOException {
+ // First channel used to declare Exchange and Queue
+ Channel channel = openChannel();
+ endpoint.declareExchangeAndQueue(channel);
+ startConsumer(channel);
+ // Other channels
+ for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) {
+ channel = openChannel();
+ startConsumer(channel);
+ }
+ }
+
+ /**
* Add a consummer thread for given channel
*/
private void startConsumer(Channel channel) throws IOException {
- RabbitConsumer consumer = new RabbitConsumer(this, channel);
- consumer.start();
- this.consumers.add(consumer);
- }
+ RabbitConsumer consumer = new RabbitConsumer(this, channel);
+ consumer.start();
+ this.consumers.add(consumer);
+ }
@Override
protected void doStart() throws Exception {
@@ -126,10 +128,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (startConsumerCallable != null) {
startConsumerCallable.stop();
}
- for(RabbitConsumer consumer: this.consumers) {
- consumer.stop();
- }
- this.consumers.clear();
+ for (RabbitConsumer consumer : this.consumers) {
+ consumer.stop();
+ }
+ this.consumers.clear();
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
conn.close(closeTimeout);
@@ -155,7 +157,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final RabbitMQConsumer consumer;
private final Channel channel;
- private String tag;
+ private String tag;
/**
* Constructs a new instance and records its association to the
* passed-in channel.
@@ -233,23 +235,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
}
- /**
- * Bind consumer to channel
- */
- public void start() throws IOException {
- tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
- }
+ /**
+ * Bind consumer to channel
+ */
+ public void start() throws IOException {
+ tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+ }
- /**
- * Unbind consumer from channel
- */
- public void stop() throws IOException{
- if (tag!=null) {
- channel.basicCancel(tag);
- }
- channel.close();
- }
- }
+ /**
+ * Unbind consumer from channel
+ */
+ public void stop() throws IOException {
+ if (tag != null) {
+ channel.basicCancel(tag);
+ }
+ channel.close();
+ }
+ }
/**
* Task in charge of opening connection and adding listener when consumer is started
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 4ee6a7c..d0484b1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,11 +81,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private int prefetchCount;
//Default value in RabbitMQ is false.
private boolean prefetchGlobal;
- /**
- * Number of concurrent consumer threads
- */
- private int concurrentConsumers = 1;
- public RabbitMQEndpoint() {
+ /**
+ * Number of concurrent consumer threads
+ */
+ private int concurrentConsumers = 1;
+
+ public RabbitMQEndpoint() {
}
public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException {
@@ -464,11 +465,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
return prefetchGlobal;
}
- public int getConcurrentConsumers() {
- return concurrentConsumers;
- }
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
- public void setConcurrentConsumers(int concurrentConsumers) {
- this.concurrentConsumers = concurrentConsumers;
- }
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
index adf1367..4e4e52b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -16,14 +16,6 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.AlreadyClosedException;
-import org.apache.camel.*;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -31,22 +23,31 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
/**
* Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
* consumers
*/
public class RabbitMQLoadIntTest extends CamelTestSupport {
- private static final int PRODUCER_COUNT=10;
- private static final int CONSUMER_COUNT=10;
- private static final int MESSAGE_COUNT=100;
- public static final String ROUTING_KEY = "rk4";
- @Produce(uri = "direct:rabbitMQ")
+ public static final String ROUTING_KEY = "rk4";
+ private static final int PRODUCER_COUNT = 10;
+ private static final int CONSUMER_COUNT = 10;
+ private static final int MESSAGE_COUNT = 100;
+
+ @Produce(uri = "direct:rabbitMQ")
protected ProducerTemplate directProducer;
@EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&queue=q4&routingKey="+ROUTING_KEY
- +"&threadPoolSize="+(CONSUMER_COUNT+5)
- +"&concurrentConsumers="+CONSUMER_COUNT)
+ + "&queue=q4&routingKey=" + ROUTING_KEY + "&threadPoolSize=" + (CONSUMER_COUNT + 5)
+ + "&concurrentConsumers=" + CONSUMER_COUNT)
private Endpoint rabbitMQEndpoint;
@EndpointInject(uri = "mock:producing")
@@ -76,24 +77,25 @@ public class RabbitMQLoadIntTest extends CamelTestSupport {
@Test
public void testSendEndReceive() throws Exception {
- // Start producers
- ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
- List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
- for(int i = 0 ; i < PRODUCER_COUNT; i++) {
- futures.add(executorService.submit(new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
- }
- }
- }));
- }
- // Wait for producers to end
- for(Future future:futures) {
- future.get(5, TimeUnit.SECONDS);
- }
- // Check message count
+ // Start producers
+ ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_COUNT);
+ List<Future> futures = new ArrayList<Future>(PRODUCER_COUNT);
+ for (int i = 0; i < PRODUCER_COUNT; i++) {
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY,
+ ROUTING_KEY);
+ }
+ }
+ }));
+ }
+ // Wait for producers to end
+ for (Future future : futures) {
+ future.get(5, TimeUnit.SECONDS);
+ }
+ // Check message count
producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);