You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 10:54:31 UTC
[1/2] camel git commit: RabbitMq consumer should be able to suspend
and resume
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x 3d7051793 -> 556600393
refs/heads/master 0fb50fcf5 -> 1d0bc598b
RabbitMq consumer should be able to suspend and resume
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d0bc598
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d0bc598
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d0bc598
Branch: refs/heads/master
Commit: 1d0bc598b3c7e6b7ed7ffa06d6cdf7885ada010d
Parents: 0fb50fc
Author: Preben Asmussen <pr...@gmail.com>
Authored: Mon Dec 28 15:16:06 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 10:53:21 2015 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 43 ++++++----
.../rabbitmq/RabbitMQSupendResumeIntTest.java | 84 ++++++++++++++++++++
2 files changed, 111 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a71769e..eaf2b6c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultConsumer;
-
public class RabbitMQConsumer extends DefaultConsumer {
private ExecutorService executor;
private Connection conn;
@@ -60,7 +59,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
public RabbitMQEndpoint getEndpoint() {
- return (RabbitMQEndpoint) super.getEndpoint();
+ return (RabbitMQEndpoint)super.getEndpoint();
}
/**
@@ -81,8 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.debug("Created channel: {}", channel);
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
}
return channel;
}
@@ -122,16 +120,25 @@ public class RabbitMQConsumer extends DefaultConsumer {
startConsumers();
} catch (Exception e) {
log.info("Connection failed, will start background thread to retry!", e);
- // Open connection, and start message listener in background
- Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
- final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
- startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
- executor.submit(startConsumerCallable);
+ reconnect();
}
}
+ private void reconnect() {
+ // Open connection, and start message listener in background
+ Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+ startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+ executor.submit(startConsumerCallable);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ reconnect();
+ }
+
/**
- * If needed, close Connection and Channels
+ * If needed, close Connection and Channels
*/
private void closeConnectionAndChannel() throws IOException, TimeoutException {
if (startConsumerCallable != null) {
@@ -154,6 +161,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
+ protected void doSuspend() throws Exception {
+ closeConnectionAndChannel();
+ }
+
+ @Override
protected void doStop() throws Exception {
closeConnectionAndChannel();
@@ -186,9 +198,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
-
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
@@ -228,7 +238,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
channel.basicAck(deliveryTag, false);
}
} else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
- // the inOut exchange failed so put the exception in the body and send back
+ // the inOut exchange failed so put the exception in the body
+ // and send back
msg.setBody(exchange.getException());
exchange.setOut(msg);
try {
@@ -282,8 +293,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * Task in charge of opening connection and adding listener when consumer is started
- * and broker is not available.
+ * Task in charge of opening connection and adding listener when consumer is
+ * started and broker is not available.
*/
private class StartConsumerCallable implements Callable<Void> {
private final long connectionRetryInterval;
http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
new file mode 100644
index 0000000..fd269a8
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
+ private static final String EXCHANGE = "ex4";
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint resultEndpoint;
+
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+ private Endpoint rabbitMQEndpoint;
+
+ @Produce(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("producer").log("sending ${body}").to(rabbitMQEndpoint);
+ from(rabbitMQEndpoint).routeId("consumer").log("got ${body}").to("mock:result");
+ }
+ };
+ }
+
+ @Test
+ public void testSuspendedResume() throws Exception {
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.expectedBodiesReceived("hello");
+
+ template.sendBody("hello");
+
+ assertMockEndpointsSatisfied();
+
+ context.suspendRoute("consumer");
+
+ // sleep a bit to ensure its properly suspended
+ Thread.sleep(2000);
+
+ resetMocks();
+ resultEndpoint.expectedMessageCount(0);
+
+ template.sendBody("Hello2");
+
+ assertMockEndpointsSatisfied(1, TimeUnit.SECONDS);
+
+ resetMocks();
+ resultEndpoint.expectedBodiesReceived("Hello2");
+ resultEndpoint.expectedMessageCount(1);
+
+ context.resumeRoute("consumer");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
[2/2] camel git commit: RabbitMq consumer should be able to suspend
and resume
Posted by da...@apache.org.
RabbitMq consumer should be able to suspend and resume
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/55660039
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/55660039
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/55660039
Branch: refs/heads/camel-2.16.x
Commit: 556600393b6790291c63effa7350ae7c31e1160a
Parents: 3d70517
Author: Preben Asmussen <pr...@gmail.com>
Authored: Mon Dec 28 15:16:06 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 10:54:23 2015 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 43 ++++++----
.../rabbitmq/RabbitMQSupendResumeIntTest.java | 84 ++++++++++++++++++++
2 files changed, 111 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/55660039/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index cdb23f4..b535915 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultConsumer;
-
public class RabbitMQConsumer extends DefaultConsumer {
private ExecutorService executor;
private Connection conn;
@@ -60,7 +59,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
public RabbitMQEndpoint getEndpoint() {
- return (RabbitMQEndpoint) super.getEndpoint();
+ return (RabbitMQEndpoint)super.getEndpoint();
}
/**
@@ -81,8 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
log.debug("Created channel: {}", channel);
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
}
return channel;
}
@@ -122,16 +120,25 @@ public class RabbitMQConsumer extends DefaultConsumer {
startConsumers();
} catch (Exception e) {
log.info("Connection failed, will start background thread to retry!", e);
- // Open connection, and start message listener in background
- Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
- final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
- startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
- executor.submit(startConsumerCallable);
+ reconnect();
}
}
+ private void reconnect() {
+ // Open connection, and start message listener in background
+ Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+ startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+ executor.submit(startConsumerCallable);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ reconnect();
+ }
+
/**
- * If needed, close Connection and Channels
+ * If needed, close Connection and Channels
*/
private void closeConnectionAndChannel() throws IOException, TimeoutException {
if (startConsumerCallable != null) {
@@ -154,6 +161,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
+ protected void doSuspend() throws Exception {
+ closeConnectionAndChannel();
+ }
+
+ @Override
protected void doStop() throws Exception {
closeConnectionAndChannel();
@@ -186,9 +198,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
-
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
@@ -228,7 +238,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
channel.basicAck(deliveryTag, false);
}
} else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
- // the inOut exchange failed so put the exception in the body and send back
+ // the inOut exchange failed so put the exception in the body
+ // and send back
msg.setBody(exchange.getException());
exchange.setOut(msg);
try {
@@ -282,8 +293,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * Task in charge of opening connection and adding listener when consumer is started
- * and broker is not available.
+ * Task in charge of opening connection and adding listener when consumer is
+ * started and broker is not available.
*/
private class StartConsumerCallable implements Callable<Void> {
private final long connectionRetryInterval;
http://git-wip-us.apache.org/repos/asf/camel/blob/55660039/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
new file mode 100644
index 0000000..fd269a8
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
+ private static final String EXCHANGE = "ex4";
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint resultEndpoint;
+
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+ private Endpoint rabbitMQEndpoint;
+
+ @Produce(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("producer").log("sending ${body}").to(rabbitMQEndpoint);
+ from(rabbitMQEndpoint).routeId("consumer").log("got ${body}").to("mock:result");
+ }
+ };
+ }
+
+ @Test
+ public void testSuspendedResume() throws Exception {
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.expectedBodiesReceived("hello");
+
+ template.sendBody("hello");
+
+ assertMockEndpointsSatisfied();
+
+ context.suspendRoute("consumer");
+
+ // sleep a bit to ensure its properly suspended
+ Thread.sleep(2000);
+
+ resetMocks();
+ resultEndpoint.expectedMessageCount(0);
+
+ template.sendBody("Hello2");
+
+ assertMockEndpointsSatisfied(1, TimeUnit.SECONDS);
+
+ resetMocks();
+ resultEndpoint.expectedBodiesReceived("Hello2");
+ resultEndpoint.expectedMessageCount(1);
+
+ context.resumeRoute("consumer");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}