You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/06 07:02:54 UTC
[1/4] git commit: RabbitMQ - Spring DSL intégration test
Repository: camel
Updated Branches:
refs/heads/master 69b00a31f -> 686d5fcca
RabbitMQ - Spring DSL intégration test
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0762be7e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0762be7e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0762be7e
Branch: refs/heads/master
Commit: 0762be7e581fce84e761442b8eb3abfe23be2af1
Parents: 2b0513d
Author: Gerald Quintana <ge...@zenika.com>
Authored: Mon May 5 15:43:20 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Mon May 5 15:44:03 2014 +0200
----------------------------------------------------------------------
components/camel-rabbitmq/pom.xml | 17 ++--
.../rabbitmq/RabbitMQSpringIntTest.java | 102 +++++++++++++++++++
.../rabbitmq/RabbitMQSpringIntTest-context.xml | 40 ++++++++
3 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 613025e..4e14471 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -48,12 +48,17 @@
</dependency>
<!-- testing -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-xml</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
new file mode 100644
index 0000000..baa4bde
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@ -0,0 +1,102 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test RabbitMQ component with Spring DSL
+ */
+@RunWith(CamelSpringJUnit4ClassRunner.class)
+@ContextConfiguration("RabbitMQSpringIntTest-context.xml")
+public class RabbitMQSpringIntTest {
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate template;
+ @Autowired
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+
+ private Connection openConnection() throws IOException {
+ if (connection == null) {
+ connection = connectionFactory.newConnection();
+ }
+ return connection;
+ }
+
+ private Channel openChannel() throws IOException {
+ if (channel == null) {
+ channel = openConnection().createChannel();
+ }
+ return channel;
+ }
+
+ @Before
+ public void bindQueueExchange() throws IOException {
+ openChannel();
+ channel.exchangeDeclare("ex2", "direct", true, false, null);
+ channel.queueDeclare("q2", true, false, false, null);
+ channel.queueBind("q2", "ex2", "rk2");
+ }
+
+ @After
+ public void closeConnection() {
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ private static class LastDeliveryConsumer extends DefaultConsumer {
+ private byte[] lastBody;
+
+ private LastDeliveryConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ lastBody = body;
+ super.handleDelivery(consumerTag, envelope, properties, body);
+ }
+
+ public byte[] getLastBody() {
+ return lastBody;
+ }
+ }
+
+ @Test
+ public void testSendCsutomConnectionFactory() throws Exception {
+ String body = "Hello Rabbit";
+ template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+ openChannel();
+ LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+ channel.basicConsume("q2", true, consumer);
+ int i = 10;
+ while (consumer.getLastBody() == null && i > 0) {
+ Thread.sleep(1000L);
+ i--;
+ }
+ assertEquals(body, new String(consumer.getLastBody()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0762be7e/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
new file mode 100644
index 0000000..6810583
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/resources/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest-context.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <!-- START SNIPPET: custom connection factory -->
+ <bean id="customConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
+ <property name="host" value="localhost"/>
+ <property name="port" value="5672"/>
+ <property name="username" value="cameltest"/>
+ <property name="password" value="cameltest"/>
+ </bean>
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:rabbitMQ"/>
+ <to uri="rabbitmq://localhost:5672/ex2?connectionFactory=#customConnectionFactory&queue=q2"/>
+ </route>
+ </camelContext>
+ <!-- END SNIPPET: example -->
+</beans>
\ No newline at end of file
[2/4] git commit: Reconnect RabbitMQ Producer/Consumer after broker
was unavailable
Posted by ni...@apache.org.
Reconnect RabbitMQ Producer/Consumer after broker was unavailable
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1191e19f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1191e19f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1191e19f
Branch: refs/heads/master
Commit: 1191e19fc194b545931530011511a55a8d6d6431
Parents: 0762be7
Author: Gerald Quintana <ge...@zenika.com>
Authored: Mon May 5 15:47:01 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Mon May 5 15:47:01 2014 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 112 +++++++++++++++----
.../component/rabbitmq/RabbitMQEndpoint.java | 22 ++++
.../component/rabbitmq/RabbitMQProducer.java | 33 +++++-
.../rabbitmq/RabbitMQEndpointTest.java | 30 +++--
.../rabbitmq/RabbitMQReConnectionIntTest.java | 77 +++++++++++++
5 files changed, 232 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 501f2a1..0bdc93b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,8 +17,9 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
@@ -36,6 +37,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
private int closeTimeout = 30 * 1000;
private final RabbitMQEndpoint endpoint;
+ /**
+ * Task in charge of starting consumer
+ */
+ private StartConsumerCallable startConsumerCallable;
public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -43,37 +48,56 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
- protected void doStart() throws Exception {
- executor = endpoint.createExecutor();
- log.debug("Using executor {}", executor);
-
- conn = endpoint.connect(executor);
- log.debug("Using conn {}", conn);
-
- channel = conn.createChannel();
- log.debug("Using channel {}", channel);
+ public RabbitMQEndpoint getEndpoint() {
+ return (RabbitMQEndpoint) super.getEndpoint();
+ }
- channel.exchangeDeclare(endpoint.getExchangeName(),
- endpoint.getExchangeType(),
- endpoint.isDurable(),
- endpoint.isAutoDelete(),
- new HashMap<String, Object>());
+ /**
+ * Open connection and channel
+ */
+ private void openConnectionAndChannel() throws IOException {
+ log.trace("Creating connection...");
+ this.conn = getEndpoint().connect(executor);
+ log.debug("Created connection: {}", conn);
+
+ log.trace("Creating channel...");
+ this.channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
+ }
- // need to make sure the queueDeclare is same with the exchange declare
- channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
- endpoint.isAutoDelete(), null);
- channel.queueBind(
- endpoint.getQueue(),
- endpoint.getExchangeName(),
- endpoint.getRoutingKey() == null ? "" : endpoint
- .getRoutingKey());
+ /**
+ * If needed, create Exchange and Queue, then add message listener
+ */
+ private void addConsumer() throws IOException {
+ getEndpoint().declareExchangeAndQueue(channel);
channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
new RabbitConsumer(this, channel));
}
@Override
- protected void doStop() throws Exception {
+ protected void doStart() throws Exception {
+ executor = endpoint.createExecutor();
+ log.debug("Using executor {}", executor);
+ try {
+ openConnectionAndChannel();
+ addConsumer();
+ } catch (Exception e) {
+ // Open connection, and start message listener in background
+ Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+ final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L;
+ startConsumerCallable=new StartConsumerCallable(connectionRetryInterval);
+ executor.submit(startConsumerCallable);
+ }
+ }
+
+ /**
+ * If needed, close Connection and Channel
+ */
+ private void closeConnectionAndChannel() throws IOException {
+ if (startConsumerCallable!=null) {
+ startConsumerCallable.stop();
+ }
if (channel != null) {
log.debug("Closing channel: {}", channel);
channel.close();
@@ -84,6 +108,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
conn.close(closeTimeout);
conn = null;
}
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ closeConnectionAndChannel();
+
if (executor != null) {
if (endpoint != null && endpoint.getCamelContext() != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -178,4 +208,38 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
+ /**
+ * Task in charge of opening connection and adding listener when consumer is started
+ * and broker is not avaiblable.
+ */
+ private class StartConsumerCallable implements Callable<Void> {
+ private final long connectionRetryInterval;
+ private final AtomicBoolean running=new AtomicBoolean(true);
+ public StartConsumerCallable(long connectionRetryInterval) {
+ this.connectionRetryInterval = connectionRetryInterval;
+ }
+ public void stop() {
+ running.set(false);
+ RabbitMQConsumer.this.startConsumerCallable=null;
+ }
+ @Override
+ public Void call() throws Exception {
+ boolean connectionFailed=true;
+ // Reconnection loop
+ while (running.get() && connectionFailed) {
+ try {
+ openConnectionAndChannel();
+ connectionFailed=false;
+ } catch (Exception e) {
+ log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e);
+ Thread.sleep(connectionRetryInterval);
+ }
+ }
+ if (!connectionFailed) {
+ addConsumer();
+ }
+ stop();
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 646a633..7f87f53 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -29,6 +30,7 @@ import javax.net.ssl.TrustManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
@@ -126,6 +128,26 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
}
}
+ /**
+ * If needed, declare Exchange, declare Queue and bind them with Routing Key
+ */
+ public void declareExchangeAndQueue(Channel channel) throws IOException {
+ channel.exchangeDeclare(getExchangeName(),
+ getExchangeType(),
+ isDurable(),
+ isAutoDelete(),
+ new HashMap<String, Object>());
+ if (getQueue()!=null) {
+ // need to make sure the queueDeclare is same with the exchange declare
+ channel.queueDeclare(getQueue(), isDurable(), false,
+ isAutoDelete(), null);
+ channel.queueBind(
+ getQueue(),
+ getExchangeName(),
+ getRoutingKey() == null ? "" : getRoutingKey());
+ }
+ }
+
private ConnectionFactory getOrCreateConnectionFactory() {
if (connectionFactory == null) {
ConnectionFactory factory = new ConnectionFactory();
http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 7763423..03177e2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -45,11 +45,10 @@ public class RabbitMQProducer extends DefaultProducer {
public RabbitMQEndpoint getEndpoint() {
return (RabbitMQEndpoint) super.getEndpoint();
}
-
- @Override
- protected void doStart() throws Exception {
- this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
-
+ /**
+ * Open connection and channel
+ */
+ private void openConnectionAndChannel() throws IOException {
log.trace("Creating connection...");
this.conn = getEndpoint().connect(executorService);
log.debug("Created connection: {}", conn);
@@ -60,7 +59,20 @@ public class RabbitMQProducer extends DefaultProducer {
}
@Override
- protected void doStop() throws Exception {
+ protected void doStart() throws Exception {
+ this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
+
+ try {
+ openConnectionAndChannel();
+ } catch (IOException e) {
+ log.warn("Failed to create connection", e);
+ }
+ }
+
+ /**
+ * If needed, close Connection and Channel
+ */
+ private void closeConnectionAndChannel() throws IOException {
if (channel != null) {
log.debug("Closing channel: {}", channel);
channel.close();
@@ -71,6 +83,11 @@ public class RabbitMQProducer extends DefaultProducer {
conn.close(closeTimeout);
conn = null;
}
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ closeConnectionAndChannel();
if (executorService != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
executorService = null;
@@ -95,6 +112,10 @@ public class RabbitMQProducer extends DefaultProducer {
byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
AMQP.BasicProperties.Builder properties = buildProperties(exchange);
+ if (channel==null) {
+ // Open connection and channel lazily
+ openConnectionAndChannel();
+ }
channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 11b3675..2db5005 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.rabbitmq;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
@@ -26,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
-import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.LongStringHelper;
@@ -132,7 +132,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
}
private ConnectionFactory createConnectionFactory(String uri) {
- RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
+ RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
+ try {
+ endpoint.connect(Executors.newSingleThreadExecutor());
+ } catch(IOException ioExc) {
+ // Doesn't matter if RabbitMQ is not available
+ log.debug("RabbitMQ not available", ioExc);
+ }
return endpoint.getConnectionFactory();
}
@@ -158,16 +164,16 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
@Test
public void testCreateConnectionFactoryCustom() throws Exception {
ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"
- + "?username=userxxx"
- + "&password=passxxx"
- + "&connectionTimeout=123"
- + "&requestedChannelMax=456"
- + "&requestedFrameMax=789"
- + "&requestedHeartbeat=987"
- + "&sslProtocol=true"
- + "&automaticRecoveryEnabled=true"
- + "&networkRecoveryInterval=654"
- + "&topologyRecoveryEnabled=false");
+ + "?username=userxxx"
+ + "&password=passxxx"
+ + "&connectionTimeout=123"
+ + "&requestedChannelMax=456"
+ + "&requestedFrameMax=789"
+ + "&requestedHeartbeat=987"
+ + "&sslProtocol=true"
+ + "&automaticRecoveryEnabled=true"
+ + "&networkRecoveryInterval=654"
+ + "&topologyRecoveryEnabled=false");
assertEquals("localhost", connectionFactory.getHost());
assertEquals(1234, connectionFactory.getPort());
http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
new file mode 100644
index 0000000..6a63948
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -0,0 +1,77 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
+ * is not avaibable.
+ * <ul>
+ * <li>Stop the broker</li>
+ * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * </ul>
+ */
+public class RabbitMQReConnectionIntTest extends CamelTestSupport {
+ private static final String EXCHANGE = "ex3";
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directProducer;
+
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
+ "&queue=q3&routingKey=rk3"+
+ "&automaticRecoveryEnabled=true" +
+ "&requestedHeartbeat=1000" +
+ "&connectionTimeout=5000")
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .onException(AlreadyClosedException.class, ConnectException.class)
+ .maximumRedeliveries(10)
+ .redeliveryDelay(500L)
+ .end()
+ .log("Sending message")
+ .inOnly(rabbitMQEndpoint);
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message")
+ .to(mockEndpoint);
+ }
+ };
+ }
+ @Test
+ public void testSendEndReceive() throws Exception {
+ int nbMessages=100;
+ int failedMessages=0;
+ for(int i=0;i<nbMessages;i++) {
+ try {
+ directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3");
+ } catch (CamelExecutionException e) {
+ log.debug("Can not send message", e);
+ failedMessages++;
+ }
+ Thread.sleep(500L);
+ }
+ mockEndpoint.assertExchangeReceived(nbMessages-failedMessages);
+ assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+ }
+}
[3/4] git commit: Fix and improve Re-connection integration test
Posted by ni...@apache.org.
Fix and improve Re-connection integration test
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8029d0c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8029d0c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8029d0c7
Branch: refs/heads/master
Commit: 8029d0c7663504b248f94f7daf36eda3e0b61060
Parents: 1191e19
Author: Gerald Quintana <ge...@zenika.com>
Authored: Tue May 6 22:42:11 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Wed May 7 09:02:21 2014 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 6 +--
.../component/rabbitmq/RabbitMQProducer.java | 2 +
.../rabbitmq/RabbitMQReConnectionIntTest.java | 42 +++++++++++---------
3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 0bdc93b..18ed62f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -63,14 +63,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.trace("Creating channel...");
this.channel = conn.createChannel();
log.debug("Created channel: {}", channel);
- }
+
+ getEndpoint().declareExchangeAndQueue(channel);
+ }
/**
* If needed, create Exchange and Queue, then add message listener
*/
private void addConsumer() throws IOException {
- getEndpoint().declareExchangeAndQueue(channel);
-
channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
new RabbitConsumer(this, channel));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 03177e2..f8596c9 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -56,6 +56,8 @@ public class RabbitMQProducer extends DefaultProducer {
log.trace("Creating channel...");
this.channel = conn.createChannel();
log.debug("Created channel: {}", channel);
+
+ getEndpoint().declareExchangeAndQueue(this.channel);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/8029d0c7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 6a63948..512aed4 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -14,11 +14,11 @@ import java.util.concurrent.TimeUnit;
* Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
* is not avaibable.
* <ul>
- * <li>Stop the broker</li>
- * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
- * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
- * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
- * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker</li>
+ * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
* </ul>
*/
public class RabbitMQReConnectionIntTest extends CamelTestSupport {
@@ -28,14 +28,17 @@ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
protected ProducerTemplate directProducer;
@EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
- "&queue=q3&routingKey=rk3"+
+ "&queue=q3&routingKey=rk3" +
"&automaticRecoveryEnabled=true" +
"&requestedHeartbeat=1000" +
"&connectionTimeout=5000")
private Endpoint rabbitMQEndpoint;
- @EndpointInject(uri = "mock:result")
- private MockEndpoint mockEndpoint;
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -46,32 +49,35 @@ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
from("direct:rabbitMQ")
.id("producingRoute")
.onException(AlreadyClosedException.class, ConnectException.class)
- .maximumRedeliveries(10)
- .redeliveryDelay(500L)
- .end()
+ .maximumRedeliveries(10)
+ .redeliveryDelay(500L)
+ .end()
.log("Sending message")
- .inOnly(rabbitMQEndpoint);
+ .inOnly(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
from(rabbitMQEndpoint)
.id("consumingRoute")
.log("Receiving message")
- .to(mockEndpoint);
+ .to(consumingMockEndpoint);
}
};
}
+
@Test
public void testSendEndReceive() throws Exception {
- int nbMessages=100;
- int failedMessages=0;
- for(int i=0;i<nbMessages;i++) {
+ int nbMessages = 50;
+ int failedMessages = 0;
+ for (int i = 0; i < nbMessages; i++) {
try {
- directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3");
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, "rk3");
} catch (CamelExecutionException e) {
log.debug("Can not send message", e);
failedMessages++;
}
Thread.sleep(500L);
}
- mockEndpoint.assertExchangeReceived(nbMessages-failedMessages);
+ producingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+ consumingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
}
}
[4/4] git commit: CAMEL-6869 merged the patch of Gerald
Posted by ni...@apache.org.
CAMEL-6869 merged the patch of Gerald
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/686d5fcc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/686d5fcc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/686d5fcc
Branch: refs/heads/master
Commit: 686d5fccac3e0308026548b58dfc90c31e41b5c9
Parents: 69b00a3 8029d0c
Author: Willem Jiang <wi...@gmail.com>
Authored: Fri Jun 6 10:58:16 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Fri Jun 6 10:58:16 2014 +0800
----------------------------------------------------------------------
components/camel-rabbitmq/pom.xml | 17 ++-
.../component/rabbitmq/RabbitMQConsumer.java | 114 +++++++++++++----
.../component/rabbitmq/RabbitMQEndpoint.java | 36 ++++--
.../component/rabbitmq/RabbitMQProducer.java | 35 +++++-
.../rabbitmq/RabbitMQEndpointTest.java | 36 +++---
.../rabbitmq/RabbitMQReConnectionIntTest.java | 102 ++++++++++++++++
.../rabbitmq/RabbitMQSpringIntTest.java | 122 +++++++++++++++++++
.../rabbitmq/RabbitMQSpringIntTest-context.xml | 40 ++++++
8 files changed, 443 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 8a9b624,18ed62f..0f1d85f
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@@ -43,42 -48,56 +48,61 @@@ public class RabbitMQConsumer extends D
}
@Override
- protected void doStart() throws Exception {
- executor = endpoint.createExecutor();
- log.debug("Using executor {}", executor);
-
- conn = endpoint.connect(executor);
- log.debug("Using conn {}", conn);
+
- channel = conn.createChannel();
- log.debug("Using channel {}", channel);
+ public RabbitMQEndpoint getEndpoint() {
+ return (RabbitMQEndpoint) super.getEndpoint();
+ }
+ /**
+ * Open connection and channel
+ */
+ private void openConnectionAndChannel() throws IOException {
+ log.trace("Creating connection...");
+ this.conn = getEndpoint().connect(executor);
+ log.debug("Created connection: {}", conn);
+
+ log.trace("Creating channel...");
+ this.channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
-
- getEndpoint().declareExchangeAndQueue(channel);
- }
++ // setup the basicQos
+ if (endpoint.isPrefetchEnabled()) {
+ channel.basicQos(endpoint.getPrefetchSize(),
+ endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+ }
-
- channel.exchangeDeclare(endpoint.getExchangeName(),
- endpoint.getExchangeType(),
- endpoint.isDurable(),
- endpoint.isAutoDelete(),
- new HashMap<String, Object>());
-
- // need to make sure the queueDeclare is same with the exchange declare
- channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
- endpoint.isAutoDelete(), null);
- channel.queueBind(
- endpoint.getQueue(),
- endpoint.getExchangeName(),
- endpoint.getRoutingKey() == null ? "" : endpoint
- .getRoutingKey());
++ getEndpoint().declareExchangeAndQueue(channel);
++ }
+ /**
+ * If needed, create Exchange and Queue, then add message listener
+ */
+ private void addConsumer() throws IOException {
channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
new RabbitConsumer(this, channel));
}
@Override
- protected void doStop() throws Exception {
+ protected void doStart() throws Exception {
+ executor = endpoint.createExecutor();
+ log.debug("Using executor {}", executor);
+ try {
+ openConnectionAndChannel();
+ addConsumer();
+ } catch (Exception e) {
+ // Open connection, and start message listener in background
+ Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
- final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L;
- startConsumerCallable=new StartConsumerCallable(connectionRetryInterval);
++ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
++ startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+ executor.submit(startConsumerCallable);
+ }
+ }
+
+ /**
+ * If needed, close Connection and Channel
+ */
+ private void closeConnectionAndChannel() throws IOException {
- if (startConsumerCallable!=null) {
++ if (startConsumerCallable != null) {
+ startConsumerCallable.stop();
+ }
if (channel != null) {
log.debug("Closing channel: {}", channel);
channel.close();
@@@ -183,4 -208,38 +213,38 @@@
}
+ /**
+ * Task in charge of opening connection and adding listener when consumer is started
- * and broker is not avaiblable.
++ * and broker is not available.
+ */
+ private class StartConsumerCallable implements Callable<Void> {
+ private final long connectionRetryInterval;
- private final AtomicBoolean running=new AtomicBoolean(true);
++ private final AtomicBoolean running = new AtomicBoolean(true);
+ public StartConsumerCallable(long connectionRetryInterval) {
+ this.connectionRetryInterval = connectionRetryInterval;
+ }
+ public void stop() {
+ running.set(false);
- RabbitMQConsumer.this.startConsumerCallable=null;
++ RabbitMQConsumer.this.startConsumerCallable = null;
+ }
+ @Override
+ public Void call() throws Exception {
- boolean connectionFailed=true;
++ boolean connectionFailed = true;
+ // Reconnection loop
+ while (running.get() && connectionFailed) {
+ try {
+ openConnectionAndChannel();
- connectionFailed=false;
++ connectionFailed = false;
+ } catch (Exception e) {
- log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e);
++ log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
+ Thread.sleep(connectionRetryInterval);
+ }
+ }
+ if (!connectionFailed) {
+ addConsumer();
+ }
+ stop();
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 9400935,7f87f53..e475819
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@@ -71,15 -73,7 +73,15 @@@ public class RabbitMQEndpoint extends D
private Boolean automaticRecoveryEnabled;
private Integer networkRecoveryInterval;
private Boolean topologyRecoveryEnabled;
-
+
+ //If it is true, prefetchSize, prefetchCount, prefetchGlobal will be used for basicOqs before starting RabbitMQConsumer
+ private boolean prefetchEnabled;
+ //Default in RabbitMq is 0.
+ private int prefetchSize;
+ private int prefetchCount;
+ //Default value in RabbitMQ is false.
+ private boolean prefetchGlobal;
-
++
public RabbitMQEndpoint() {
}
@@@ -134,7 -128,27 +136,27 @@@
}
}
- protected ConnectionFactory getOrCreateConnectionFactory() {
- /**
++ /**
+ * If needed, declare Exchange, declare Queue and bind them with Routing Key
+ */
+ public void declareExchangeAndQueue(Channel channel) throws IOException {
+ channel.exchangeDeclare(getExchangeName(),
+ getExchangeType(),
+ isDurable(),
+ isAutoDelete(),
+ new HashMap<String, Object>());
- if (getQueue()!=null) {
++ if (getQueue() != null) {
+ // need to make sure the queueDeclare is same with the exchange declare
+ channel.queueDeclare(getQueue(), isDurable(), false,
+ isAutoDelete(), null);
+ channel.queueBind(
+ getQueue(),
+ getExchangeName(),
+ getRoutingKey() == null ? "" : getRoutingKey());
+ }
+ }
+
+ private ConnectionFactory getOrCreateConnectionFactory() {
if (connectionFactory == null) {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(getUsername());
@@@ -299,22 -313,22 +321,22 @@@
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
--
++
public void setBridgeEndpoint(boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
--
++
public boolean isBridgeEndpoint() {
return bridgeEndpoint;
}
--
++
public void setAddresses(String addresses) {
Address[] addressArray = Address.parseAddresses(addresses);
if (addressArray.length > 0) {
this.addresses = addressArray;
}
}
--
++
public Address[] getAddresses() {
return addresses;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 7763423,f8596c9..2e22d3f
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@@ -57,6 -56,8 +56,8 @@@ public class RabbitMQProducer extends D
log.trace("Creating channel...");
this.channel = conn.createChannel();
log.debug("Created channel: {}", channel);
+
- getEndpoint().declareExchangeAndQueue(this.channel);
++ getEndpoint().declareExchangeAndQueue(this.channel);
}
@Override
@@@ -95,6 -114,10 +114,10 @@@
byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
AMQP.BasicProperties.Builder properties = buildProperties(exchange);
- if (channel==null) {
++ if (channel == null) {
+ // Open connection and channel lazily
+ openConnectionAndChannel();
+ }
channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 86a7bcc,2db5005..afae40d
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@@ -109,7 -109,7 +109,7 @@@ public class RabbitMQEndpointTest exten
ThreadPoolExecutor executor = assertIsInstanceOf(ThreadPoolExecutor.class, endpoint.createExecutor());
assertEquals(20, executor.getCorePoolSize());
}
--
++
@Test
public void createEndpointWithAutoAckDisabled() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?autoAck=false", RabbitMQEndpoint.class);
@@@ -122,7 -122,7 +122,7 @@@
assertTrue(endpoint.isSingleton());
}
--
++
@Test
public void brokerEndpointAddressesSettings() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class);
@@@ -133,7 -133,13 +133,13 @@@
private ConnectionFactory createConnectionFactory(String uri) {
RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
- return endpoint.getOrCreateConnectionFactory();
- try {
- endpoint.connect(Executors.newSingleThreadExecutor());
- } catch(IOException ioExc) {
- // Doesn't matter if RabbitMQ is not available
- log.debug("RabbitMQ not available", ioExc);
- }
++ try {
++ endpoint.connect(Executors.newSingleThreadExecutor());
++ } catch (IOException ioExc) {
++ // Doesn't matter if RabbitMQ is not available
++ log.debug("RabbitMQ not available", ioExc);
++ }
+ return endpoint.getConnectionFactory();
}
@Test
@@@ -157,17 -163,17 +163,17 @@@
@Test
public void testCreateConnectionFactoryCustom() throws Exception {
-- ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"
- + "?username=userxxx"
- + "&password=passxxx"
- + "&connectionTimeout=123"
- + "&requestedChannelMax=456"
- + "&requestedFrameMax=789"
- + "&requestedHeartbeat=987"
- + "&sslProtocol=true"
- + "&automaticRecoveryEnabled=true"
- + "&networkRecoveryInterval=654"
- + "&topologyRecoveryEnabled=false");
- + "?username=userxxx"
- + "&password=passxxx"
- + "&connectionTimeout=123"
- + "&requestedChannelMax=456"
- + "&requestedFrameMax=789"
- + "&requestedHeartbeat=987"
- + "&sslProtocol=true"
- + "&automaticRecoveryEnabled=true"
- + "&networkRecoveryInterval=654"
- + "&topologyRecoveryEnabled=false");
++ ConnectionFactory connectionFactory = createConnectionFactory("rabbitmq:localhost:1234/exchange"
++ + "?username=userxxx"
++ + "&password=passxxx"
++ + "&connectionTimeout=123"
++ + "&requestedChannelMax=456"
++ + "&requestedFrameMax=789"
++ + "&requestedHeartbeat=987"
++ + "&sslProtocol=true"
++ + "&automaticRecoveryEnabled=true"
++ + "&networkRecoveryInterval=654"
++ + "&topologyRecoveryEnabled=false");
assertEquals("localhost", connectionFactory.getHost());
assertEquals(1234, connectionFactory.getPort());
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 0000000,512aed4..302440c
mode 000000,100644..100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@@ -1,0 -1,83 +1,102 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+ package org.apache.camel.component.rabbitmq;
+
++import java.net.ConnectException;
++import java.util.concurrent.TimeUnit;
++
+ import com.rabbitmq.client.AlreadyClosedException;
-import org.apache.camel.*;
++
++import org.apache.camel.CamelExecutionException;
++import org.apache.camel.Endpoint;
++import org.apache.camel.EndpointInject;
++import org.apache.camel.Produce;
++import org.apache.camel.ProducerTemplate;
+ import org.apache.camel.builder.RouteBuilder;
+ import org.apache.camel.component.mock.MockEndpoint;
+ import org.apache.camel.test.junit4.CamelTestSupport;
+ import org.junit.Test;
+
-import java.net.ConnectException;
-import java.util.concurrent.TimeUnit;
-
+ /**
+ * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
+ * is not avaibable.
+ * <ul>
+ * <li>Stop the broker</li>
+ * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * </ul>
+ */
+ public class RabbitMQReConnectionIntTest extends CamelTestSupport {
+ private static final String EXCHANGE = "ex3";
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directProducer;
+
- @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
- "&queue=q3&routingKey=rk3" +
- "&automaticRecoveryEnabled=true" +
- "&requestedHeartbeat=1000" +
- "&connectionTimeout=5000")
++ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest"
++ + "&queue=q3&routingKey=rk3" + "&automaticRecoveryEnabled=true"
++ + "&requestedHeartbeat=1000" + "&connectionTimeout=5000")
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .onException(AlreadyClosedException.class, ConnectException.class)
+ .maximumRedeliveries(10)
+ .redeliveryDelay(500L)
+ .end()
+ .log("Sending message")
+ .inOnly(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message")
+ .to(consumingMockEndpoint);
+ }
+ };
+ }
+
+ @Test
+ public void testSendEndReceive() throws Exception {
+ int nbMessages = 50;
+ int failedMessages = 0;
+ for (int i = 0; i < nbMessages; i++) {
+ try {
+ directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, "rk3");
+ } catch (CamelExecutionException e) {
+ log.debug("Can not send message", e);
+ failedMessages++;
+ }
+ Thread.sleep(500L);
+ }
+ producingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+ consumingMockEndpoint.expectedMessageCount(nbMessages - failedMessages);
+ assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/camel/blob/686d5fcc/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
----------------------------------------------------------------------
diff --cc components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
index 0000000,baa4bde..6119082
mode 000000,100644..100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSpringIntTest.java
@@@ -1,0 -1,102 +1,122 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+ package org.apache.camel.component.rabbitmq;
+
-import com.rabbitmq.client.*;
++import java.io.IOException;
++
++import com.rabbitmq.client.AMQP;
++import com.rabbitmq.client.Channel;
++import com.rabbitmq.client.Connection;
++import com.rabbitmq.client.ConnectionFactory;
++import com.rabbitmq.client.DefaultConsumer;
++import com.rabbitmq.client.Envelope;
++
+ import org.apache.camel.Produce;
+ import org.apache.camel.ProducerTemplate;
+ import org.apache.camel.test.spring.CamelSpringJUnit4ClassRunner;
+ import org.junit.After;
+ import org.junit.Before;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.test.context.ContextConfiguration;
-
-import java.io.IOException;
-
+ import static org.junit.Assert.assertEquals;
-
+ /**
+ * Test RabbitMQ component with Spring DSL
+ */
+ @RunWith(CamelSpringJUnit4ClassRunner.class)
+ @ContextConfiguration("RabbitMQSpringIntTest-context.xml")
+ public class RabbitMQSpringIntTest {
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate template;
+ @Autowired
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+
+ private Connection openConnection() throws IOException {
+ if (connection == null) {
+ connection = connectionFactory.newConnection();
+ }
+ return connection;
+ }
+
+ private Channel openChannel() throws IOException {
+ if (channel == null) {
+ channel = openConnection().createChannel();
+ }
+ return channel;
+ }
+
+ @Before
+ public void bindQueueExchange() throws IOException {
+ openChannel();
+ channel.exchangeDeclare("ex2", "direct", true, false, null);
+ channel.queueDeclare("q2", true, false, false, null);
+ channel.queueBind("q2", "ex2", "rk2");
+ }
+
+ @After
+ public void closeConnection() {
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
- private static class LastDeliveryConsumer extends DefaultConsumer {
++ private static final class LastDeliveryConsumer extends DefaultConsumer {
+ private byte[] lastBody;
+
+ private LastDeliveryConsumer(Channel channel) {
+ super(channel);
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ lastBody = body;
+ super.handleDelivery(consumerTag, envelope, properties, body);
+ }
+
+ public byte[] getLastBody() {
+ return lastBody;
+ }
+ }
+
+ @Test
+ public void testSendCsutomConnectionFactory() throws Exception {
+ String body = "Hello Rabbit";
+ template.sendBodyAndHeader(body, RabbitMQConstants.ROUTING_KEY, "rk2");
+
+ openChannel();
+ LastDeliveryConsumer consumer = new LastDeliveryConsumer(channel);
+ channel.basicConsume("q2", true, consumer);
+ int i = 10;
+ while (consumer.getLastBody() == null && i > 0) {
+ Thread.sleep(1000L);
+ i--;
+ }
+ assertEquals(body, new String(consumer.getLastBody()));
+ }
+ }