You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/30 17:15:06 UTC
[2/2] git commit: CAMEL-7556 Fixed bunch of CS errors
CAMEL-7556 Fixed bunch of CS errors
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/28a8d00d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/28a8d00d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/28a8d00d
Branch: refs/heads/master
Commit: 28a8d00d3b37543f91e60ee9b5b956ca4f90e000
Parents: 3c4f833
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Jun 30 21:43:28 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Jun 30 21:43:28 2014 +0800
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 120 ++++++++++---------
.../component/rabbitmq/RabbitMQEndpoint.java | 23 ++--
.../component/rabbitmq/RabbitMQLoadIntTest.java | 70 +++++------
3 files changed, 109 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 91da43b..2539b14 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -41,12 +41,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
* Task in charge of starting consumer
*/
private StartConsumerCallable startConsumerCallable;
- /**
- * Running consumers
- */
- private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
+ /**
+ * Running consumers
+ */
+ private final List<RabbitConsumer> consumers = new ArrayList<RabbitConsumer>();
- public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+ public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
@@ -66,42 +66,44 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.debug("Created connection: {}", conn);
}
- /**
- * Open channel
- */
- private Channel openChannel() throws IOException {
- log.trace("Creating channel...");
- Channel channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
- // setup the basicQos
- if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(),
- endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
- }
- return channel;
- }
- /**
- * Add a consummer thread for given channel
- */
- private void startConsumers() throws IOException {
- // First channel used to declare Exchange and Queue
- Channel channel=openChannel();
- endpoint.declareExchangeAndQueue(channel);
- startConsumer(channel);
- // Other channels
- for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
- channel=openChannel();
- startConsumer(channel);
- }
- }
- /**
+ /**
+ * Open channel
+ */
+ private Channel openChannel() throws IOException {
+ log.trace("Creating channel...");
+ Channel channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
+ // setup the basicQos
+ if (endpoint.isPrefetchEnabled()) {
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
+ endpoint.isPrefetchGlobal());
+ }
+ return channel;
+ }
+
+ /**
+ * Add a consummer thread for given channel
+ */
+ private void startConsumers() throws IOException {
+ // First channel used to declare Exchange and Queue
+ Channel channel = openChannel();
+ endpoint.declareExchangeAndQueue(channel);
+ startConsumer(channel);
+ // Other channels
+ for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) {
+ channel = openChannel();
+ startConsumer(channel);
+ }
+ }
+
+ /**
* Add a consummer thread for given channel
*/
private void startConsumer(Channel channel) throws IOException {
- RabbitConsumer consumer = new RabbitConsumer(this, channel);
- consumer.start();
- this.consumers.add(consumer);
- }
+ RabbitConsumer consumer = new RabbitConsumer(this, channel);
+ consumer.start();
+ this.consumers.add(consumer);
+ }
@Override
protected void doStart() throws Exception {
@@ -126,10 +128,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (startConsumerCallable != null) {
startConsumerCallable.stop();
}
- for(RabbitConsumer consumer: this.consumers) {
- consumer.stop();
- }
- this.consumers.clear();
+ for (RabbitConsumer consumer : this.consumers) {
+ consumer.stop();
+ }
+ this.consumers.clear();
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
conn.close(closeTimeout);
@@ -155,7 +157,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final RabbitMQConsumer consumer;
private final Channel channel;
- private String tag;
+ private String tag;
/**
* Constructs a new instance and records its association to the
* passed-in channel.
@@ -233,23 +235,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
}
- /**
- * Bind consumer to channel
- */
- public void start() throws IOException {
- tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
- }
+ /**
+ * Bind consumer to channel
+ */
+ public void start() throws IOException {
+ tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+ }
- /**
- * Unbind consumer from channel
- */
- public void stop() throws IOException{
- if (tag!=null) {
- channel.basicCancel(tag);
- }
- channel.close();
- }
- }
+ /**
+ * Unbind consumer from channel
+ */
+ public void stop() throws IOException {
+ if (tag != null) {
+ channel.basicCancel(tag);
+ }
+ channel.close();
+ }
+ }
/**
* Task in charge of opening connection and adding listener when consumer is started
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 4ee6a7c..d0484b1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,11 +81,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private int prefetchCount;
//Default value in RabbitMQ is false.
private boolean prefetchGlobal;
- /**
- * Number of concurrent consumer threads
- */
- private int concurrentConsumers = 1;
- public RabbitMQEndpoint() {
+ /**
+ * Number of concurrent consumer threads
+ */
+ private int concurrentConsumers = 1;
+
+ public RabbitMQEndpoint() {
}
public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException {
@@ -464,11 +465,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
return prefetchGlobal;
}
- public int getConcurrentConsumers() {
- return concurrentConsumers;
- }
+ public int getConcurrentConsumers() {
+ return concurrentConsumers;
+ }
- public void setConcurrentConsumers(int concurrentConsumers) {
- this.concurrentConsumers = concurrentConsumers;
- }
+ public void setConcurrentConsumers(int concurrentConsumers) {
+ this.concurrentConsumers = concurrentConsumers;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/28a8d00d/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
index adf1367..4e4e52b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -16,14 +16,6 @@
*/
package org.apache.camel.component.rabbitmq;
-import com.rabbitmq.client.AlreadyClosedException;
-import org.apache.camel.*;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -31,22 +23,31 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
/**
* Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple producers and
* consumers
*/
public class RabbitMQLoadIntTest extends CamelTestSupport {
- private static final int PRODUCER_COUNT=10;
- private static final int CONSUMER_COUNT=10;
- private static final int MESSAGE_COUNT=100;
- public static final String ROUTING_KEY = "rk4";
- @Produce(uri = "direct:rabbitMQ")
+ public static final String ROUTING_KEY = "rk4";
+ private static final int PRODUCER_COUNT = 10;
+ private static final int CONSUMER_COUNT = 10;
+ private static final int MESSAGE_COUNT = 100;
+
+ @Produce(uri = "direct:rabbitMQ")
protected ProducerTemplate directProducer;
@EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
- + "&queue=q4&routingKey="+ROUTING_KEY
- +"&threadPoolSize="+(CONSUMER_COUNT+5)
- +"&concurrentConsumers="+CONSUMER_COUNT)
+ + "&queue=q4&routingKey=" + ROUTING_KEY + "&threadPoolSize=" + (CONSUMER_COUNT + 5)
+ + "&concurrentConsumers=" + CONSUMER_COUNT)
private Endpoint rabbitMQEndpoint;
@EndpointInject(uri = "mock:producing")
@@ -76,24 +77,25 @@ public class RabbitMQLoadIntTest extends CamelTestSupport {
@Test
public void testSendEndReceive() throws Exception {
- // Start producers
- ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
- List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
- for(int i = 0 ; i < PRODUCER_COUNT; i++) {
- futures.add(executorService.submit(new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
- }
- }
- }));
- }
- // Wait for producers to end
- for(Future future:futures) {
- future.get(5, TimeUnit.SECONDS);
- }
- // Check message count
+ // Start producers
+ ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_COUNT);
+ List<Future> futures = new ArrayList<Future>(PRODUCER_COUNT);
+ for (int i = 0; i < PRODUCER_COUNT; i++) {
+ futures.add(executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY,
+ ROUTING_KEY);
+ }
+ }
+ }));
+ }
+ // Wait for producers to end
+ for (Future future : futures) {
+ future.get(5, TimeUnit.SECONDS);
+ }
+ // Check message count
producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);