You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/12/20 19:08:36 UTC
[1/2] git commit: CAMEL-7042: camel-rabbit consumer should stop all
resources like the producer does. Thanks to Fergus Nelson for reporting.
Updated Branches:
refs/heads/camel-2.12.x 2f3a336e4 -> f90c23ff3
refs/heads/master 6d8af2f2b -> 0be7ba1aa
CAMEL-7042: camel-rabbit consumer should stop all resources like the producer does. Thanks to Fergus Nelson for reporting.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0be7ba1a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0be7ba1a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0be7ba1a
Branch: refs/heads/master
Commit: 0be7ba1aa07f1e3e67baf9f0a7a2c26052ae6277
Parents: 6d8af2f
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 20 19:11:20 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 20 19:11:20 2013 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 29 ++++++++------------
.../rabbitmq/RabbitMQConsumerTest.java | 6 ++--
.../rabbitmq/RabbitMQEndpointTest.java | 1 -
3 files changed, 15 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0be7ba1a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index df3f698..f46e670 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
+ private int closeTimeout = 30 * 1000;
ExecutorService executor;
Connection conn;
Channel channel;
@@ -43,9 +44,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
- super.doStart();
- log.info("Starting RabbitMQ consumer");
-
executor = endpoint.createExecutor();
log.debug("Using executor {}", executor);
@@ -76,27 +74,24 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
- super.doStop();
- log.info("Stopping RabbitMQ consumer");
+ if (channel != null) {
+ log.debug("Closing channel: {}", channel);
+ channel.close();
+ channel = null;
+ }
if (conn != null) {
- try {
- conn.close();
- } catch (Exception ignored) {
- // ignored
- }
+ log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
+ conn.close(closeTimeout);
+ conn = null;
}
-
- channel = null;
- conn = null;
-
if (executor != null) {
- if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ if (endpoint != null && endpoint.getCamelContext() != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
} else {
executor.shutdownNow();
}
+ executor = null;
}
- executor = null;
}
class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
http://git-wip-us.apache.org/repos/asf/camel/blob/0be7ba1a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index 5a17480..a6676b7 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -39,7 +39,7 @@ public class RabbitMQConsumerTest {
private Channel channel = Mockito.mock(Channel.class);
@Test
- public void testStoppingConsumerShutsdownExecutor() throws Exception {
+ public void testStoppingConsumerShutdownExecutor() throws Exception {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
@@ -55,7 +55,7 @@ public class RabbitMQConsumerTest {
}
@Test
- public void testStoppingConsumerShutsdownConnection() throws Exception {
+ public void testStoppingConsumerShutdownConnection() throws Exception {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
@@ -65,6 +65,6 @@ public class RabbitMQConsumerTest {
consumer.doStart();
consumer.doStop();
- Mockito.verify(conn).close();
+ Mockito.verify(conn).close(30 * 1000);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0be7ba1a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 9f22e92..74a6134 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -119,6 +119,5 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals("Wrong size of endpoint addresses.", 2, endpoint.getAddresses().length);
assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]);
assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]);
-
}
}
[2/2] git commit: CAMEL-7042: camel-rabbit consumer should stop all
resources like the producer does. Thanks to Fergus Nelson for reporting.
Posted by da...@apache.org.
CAMEL-7042: camel-rabbit consumer should stop all resources like the producer does. Thanks to Fergus Nelson for reporting.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f90c23ff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f90c23ff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f90c23ff
Branch: refs/heads/camel-2.12.x
Commit: f90c23ff30edd76a28e4255bbe13e5c1776c65c0
Parents: 2f3a336
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 20 19:11:20 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Dec 20 19:11:37 2013 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 29 ++++++++------------
.../rabbitmq/RabbitMQConsumerTest.java | 6 ++--
.../rabbitmq/RabbitMQEndpointTest.java | 1 -
3 files changed, 15 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f90c23ff/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index df3f698..f46e670 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
+ private int closeTimeout = 30 * 1000;
ExecutorService executor;
Connection conn;
Channel channel;
@@ -43,9 +44,6 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
protected void doStart() throws Exception {
- super.doStart();
- log.info("Starting RabbitMQ consumer");
-
executor = endpoint.createExecutor();
log.debug("Using executor {}", executor);
@@ -76,27 +74,24 @@ public class RabbitMQConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
- super.doStop();
- log.info("Stopping RabbitMQ consumer");
+ if (channel != null) {
+ log.debug("Closing channel: {}", channel);
+ channel.close();
+ channel = null;
+ }
if (conn != null) {
- try {
- conn.close();
- } catch (Exception ignored) {
- // ignored
- }
+ log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
+ conn.close(closeTimeout);
+ conn = null;
}
-
- channel = null;
- conn = null;
-
if (executor != null) {
- if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ if (endpoint != null && endpoint.getCamelContext() != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
} else {
executor.shutdownNow();
}
+ executor = null;
}
- executor = null;
}
class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
http://git-wip-us.apache.org/repos/asf/camel/blob/f90c23ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index 5a17480..a6676b7 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -39,7 +39,7 @@ public class RabbitMQConsumerTest {
private Channel channel = Mockito.mock(Channel.class);
@Test
- public void testStoppingConsumerShutsdownExecutor() throws Exception {
+ public void testStoppingConsumerShutdownExecutor() throws Exception {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
@@ -55,7 +55,7 @@ public class RabbitMQConsumerTest {
}
@Test
- public void testStoppingConsumerShutsdownConnection() throws Exception {
+ public void testStoppingConsumerShutdownConnection() throws Exception {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
@@ -65,6 +65,6 @@ public class RabbitMQConsumerTest {
consumer.doStart();
consumer.doStop();
- Mockito.verify(conn).close();
+ Mockito.verify(conn).close(30 * 1000);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f90c23ff/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 9f22e92..74a6134 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -119,6 +119,5 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertEquals("Wrong size of endpoint addresses.", 2, endpoint.getAddresses().length);
assertEquals("Get a wrong endpoint address.", new Address("server1", 12345), endpoint.getAddresses()[0]);
assertEquals("Get a wrong endpoint address.", new Address("server2", 12345), endpoint.getAddresses()[1]);
-
}
}