You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2018/05/31 07:47:54 UTC

[15/17] james-project git commit: JAMES-2334 Test cross cluster operations

JAMES-2334 Test cross cluster operations


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/00adfa82
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/00adfa82
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/00adfa82

Branch: refs/heads/master
Commit: 00adfa82472cf1ed8e4cc04a5d386c397cda9801
Parents: 749e641
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:30:14 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 121 ++++++++++---------
 1 file changed, 66 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/00adfa82/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index f88ecf4..4a7dd07 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -23,17 +23,21 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -46,15 +50,36 @@ import com.rabbitmq.client.ConnectionFactory;
 @ExtendWith(DockerClusterRabbitMQExtention.class)
 class RabbitMQClusterTest {
 
-    public static final String QUEUE = "queue";
+    private static final String QUEUE = "queue";
+
+    private Connection node1Connection;
+    private Channel node1Channel;
+    private Connection node2Connection;
+    private Channel node2Channel;
+
+    @BeforeEach
+    void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+        ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+        node1Connection = node1ConnectionFactory.newConnection();
+        node2Connection = node2ConnectionFactory.newConnection();
+        node1Channel = node1Connection.createChannel();
+        node2Channel = node2Connection.createChannel();
+    }
 
     @AfterEach
-    public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
-        cluster.getRabbitMQ2()
-            .connectionFactory()
-            .newConnection()
-            .createChannel()
-            .queueDelete(QUEUE);
+    void tearDown() {
+        closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        for (AutoCloseable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                //ignoring exception
+            }
+        }
     }
 
     @Test
@@ -71,64 +96,50 @@ class RabbitMQClusterTest {
     }
 
     @Test
-    public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
-        ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+    void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+        node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+        node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+        int nbMessages = 10;
+        IntStream.range(0, nbMessages)
+            .mapToObj(i -> asBytes(String.valueOf(i)))
+            .forEach(Throwing.consumer(
+                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        try (Connection connection = connectionFactory1.newConnection();
-             Channel channel = connection.createChannel()) {
-
-            channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
-            channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
-
-            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
-                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
-        }
 
-        try (Connection connection2 = connectionFactory2.newConnection();
-             Channel channel2 = connection2.createChannel()) {
+        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+        node2Channel.basicConsume(QUEUE, consumer2);
 
-            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
-            channel2.basicConsume(QUEUE, consumer2);
+        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
 
-            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
-
-            assertThat(consumer2.getConsumedMessages())
-                .containsOnlyElementsOf(MESSAGES);
-        }
-
-        assertThat(result)
-            .containsOnlyElementsOf(MESSAGES);
+        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
     }
 
     @Test
-    public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
-        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+    void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+        node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+        node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        try (Connection connection = connectionFactory1.newConnection();
-             Channel channel = connection.createChannel();
-             Connection connection2 = connectionFactory2.newConnection();
-             Channel channel2 = connection2.createChannel()) {
+        int nbMessages = 10;
+        IntStream.range(0, nbMessages)
+            .mapToObj(i -> asBytes(String.valueOf(i)))
+            .forEach(Throwing.consumer(
+                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-            channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-            channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+        node2Channel.basicConsume(QUEUE, consumer2);
 
-            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
-                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
 
-            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
-            channel2.basicConsume(QUEUE, consumer2);
-
-            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+    }
 
-            assertThat(consumer2.getConsumedMessages())
-                .containsOnlyElementsOf(MESSAGES);
-        }
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org