You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/24 09:53:35 UTC

[1/2] camel git commit: CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x a863c8868 -> 968bac1d8
  refs/heads/master d518e543a -> 406b83ef6


CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/406b83ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/406b83ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/406b83ef

Branch: refs/heads/master
Commit: 406b83ef66215976149733ff31beaafe04d0af7c
Parents: d518e54
Author: Darrell King <Da...@hermes-europe.co.uk>
Authored: Tue May 24 09:18:53 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 24 11:52:08 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      |  6 +++--
 .../rabbitmq/RabbitMQConsumerTest.java          | 26 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index eeeafd6..21560f8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -156,11 +156,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (channel == null) {
             return;
         }
-        if (tag != null) {
+        if (tag != null && isChannelOpen()) {
             channel.basicCancel(tag);
         }
         try {
-            channel.close();
+            if (isChannelOpen()) {
+                channel.close();
+            }
         } catch (TimeoutException e) {
             log.error("Timeout occured");
             throw e;

http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index ef6b096..da84477 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
+import com.rabbitmq.client.Consumer;
 import org.apache.camel.Processor;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -30,6 +32,9 @@ import org.mockito.Mockito;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 
 public class RabbitMQConsumerTest {
 
@@ -69,4 +74,25 @@ public class RabbitMQConsumerTest {
 
         Mockito.verify(conn).close(30 * 1000);
     }
+
+    @Test
+    public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
+        AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+        Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
+        Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(channel);
+        Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
+        Mockito.when(channel.isOpen()).thenReturn(false);
+        Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
+        Mockito.doThrow(alreadyClosedException).when(channel).close();
+
+        consumer.doStart();
+        consumer.doStop();
+
+        Mockito.verify(conn).close(30 * 1000);
+    }
 }


[2/2] camel git commit: CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.

Posted by da...@apache.org.
CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/968bac1d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/968bac1d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/968bac1d

Branch: refs/heads/camel-2.17.x
Commit: 968bac1d82d1b7c20be5210dad0d0bf8120c43fb
Parents: a863c88
Author: Darrell King <Da...@hermes-europe.co.uk>
Authored: Tue May 24 09:18:53 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue May 24 11:53:07 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      |  6 +++--
 .../rabbitmq/RabbitMQConsumerTest.java          | 26 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/968bac1d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index a03e7f8..fb61c4b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -159,11 +159,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (channel == null) {
             return;
         }
-        if (tag != null) {
+        if (tag != null && isChannelOpen()) {
             channel.basicCancel(tag);
         }
         try {
-            channel.close();
+            if (isChannelOpen()) {
+                channel.close();
+            }
         } catch (TimeoutException e) {
             log.error("Timeout occured");
             throw e;

http://git-wip-us.apache.org/repos/asf/camel/blob/968bac1d/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index ef6b096..da84477 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
+import com.rabbitmq.client.Consumer;
 import org.apache.camel.Processor;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -30,6 +32,9 @@ import org.mockito.Mockito;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 
 public class RabbitMQConsumerTest {
 
@@ -69,4 +74,25 @@ public class RabbitMQConsumerTest {
 
         Mockito.verify(conn).close(30 * 1000);
     }
+
+    @Test
+    public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws Exception {
+        AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+        Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
+        Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(channel);
+        Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
+        Mockito.when(channel.isOpen()).thenReturn(false);
+        Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
+        Mockito.doThrow(alreadyClosedException).when(channel).close();
+
+        consumer.doStart();
+        consumer.doStop();
+
+        Mockito.verify(conn).close(30 * 1000);
+    }
 }