You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/24 09:53:35 UTC
[1/2] camel git commit: CAMEL-9984: Rabbit MQ connection is not
closed when channel has been closed by server.
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x a863c8868 -> 968bac1d8
refs/heads/master d518e543a -> 406b83ef6
CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/406b83ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/406b83ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/406b83ef
Branch: refs/heads/master
Commit: 406b83ef66215976149733ff31beaafe04d0af7c
Parents: d518e54
Author: Darrell King <Da...@hermes-europe.co.uk>
Authored: Tue May 24 09:18:53 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 24 11:52:08 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 6 +++--
.../rabbitmq/RabbitMQConsumerTest.java | 26 ++++++++++++++++++++
2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index eeeafd6..21560f8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -156,11 +156,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (channel == null) {
return;
}
- if (tag != null) {
+ if (tag != null && isChannelOpen()) {
channel.basicCancel(tag);
}
try {
- channel.close();
+ if (isChannelOpen()) {
+ channel.close();
+ }
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index ef6b096..da84477 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
+import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
import org.apache.camel.Processor;
import org.junit.Test;
import org.mockito.Matchers;
@@ -30,6 +32,9 @@ import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
public class RabbitMQConsumerTest {
@@ -69,4 +74,25 @@ public class RabbitMQConsumerTest {
Mockito.verify(conn).close(30 * 1000);
}
+
+ @Test
+ public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
+ AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+
+ RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+ Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+ Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
+ Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+ Mockito.when(conn.createChannel()).thenReturn(channel);
+ Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
+ Mockito.when(channel.isOpen()).thenReturn(false);
+ Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
+ Mockito.doThrow(alreadyClosedException).when(channel).close();
+
+ consumer.doStart();
+ consumer.doStop();
+
+ Mockito.verify(conn).close(30 * 1000);
+ }
}
[2/2] camel git commit: CAMEL-9984: Rabbit MQ connection is not
closed when channel has been closed by server.
Posted by da...@apache.org.
CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/968bac1d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/968bac1d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/968bac1d
Branch: refs/heads/camel-2.17.x
Commit: 968bac1d82d1b7c20be5210dad0d0bf8120c43fb
Parents: a863c88
Author: Darrell King <Da...@hermes-europe.co.uk>
Authored: Tue May 24 09:18:53 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 24 11:53:07 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 6 +++--
.../rabbitmq/RabbitMQConsumerTest.java | 26 ++++++++++++++++++++
2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/968bac1d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index a03e7f8..fb61c4b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -159,11 +159,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (channel == null) {
return;
}
- if (tag != null) {
+ if (tag != null && isChannelOpen()) {
channel.basicCancel(tag);
}
try {
- channel.close();
+ if (isChannelOpen()) {
+ channel.close();
+ }
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
http://git-wip-us.apache.org/repos/asf/camel/blob/968bac1d/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index ef6b096..da84477 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
+import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
import org.apache.camel.Processor;
import org.junit.Test;
import org.mockito.Matchers;
@@ -30,6 +32,9 @@ import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
public class RabbitMQConsumerTest {
@@ -69,4 +74,25 @@ public class RabbitMQConsumerTest {
Mockito.verify(conn).close(30 * 1000);
}
+
+ @Test
+ public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
+ AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+
+ RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+ Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+ Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
+ Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+ Mockito.when(conn.createChannel()).thenReturn(channel);
+ Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
+ Mockito.when(channel.isOpen()).thenReturn(false);
+ Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
+ Mockito.doThrow(alreadyClosedException).when(channel).close();
+
+ consumer.doStart();
+ consumer.doStop();
+
+ Mockito.verify(conn).close(30 * 1000);
+ }
}