You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2018/05/31 07:47:54 UTC
[15/17] james-project git commit: JAMES-2334 Test cross cluster
operations
JAMES-2334 Test cross cluster operations
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/00adfa82
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/00adfa82
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/00adfa82
Branch: refs/heads/master
Commit: 00adfa82472cf1ed8e4cc04a5d386c397cda9801
Parents: 749e641
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:30:14 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../queue/rabbitmq/RabbitMQClusterTest.java | 121 ++++++++++---------
1 file changed, 66 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/00adfa82/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index f88ecf4..4a7dd07 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -23,17 +23,21 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -46,15 +50,36 @@ import com.rabbitmq.client.ConnectionFactory;
@ExtendWith(DockerClusterRabbitMQExtention.class)
class RabbitMQClusterTest {
- public static final String QUEUE = "queue";
+ private static final String QUEUE = "queue";
+
+ private Connection node1Connection;
+ private Channel node1Channel;
+ private Connection node2Connection;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node1Connection = node1ConnectionFactory.newConnection();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node1Channel = node1Connection.createChannel();
+ node2Channel = node2Connection.createChannel();
+ }
@AfterEach
- public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
- cluster.getRabbitMQ2()
- .connectionFactory()
- .newConnection()
- .createChannel()
- .queueDelete(QUEUE);
+ void tearDown() {
+ closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignoring exception
+ }
+ }
}
@Test
@@ -71,64 +96,50 @@ class RabbitMQClusterTest {
}
@Test
- public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
- ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+ void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- try (Connection connection = connectionFactory1.newConnection();
- Channel channel = connection.createChannel()) {
-
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
- channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
-
- MESSAGES_AS_BYTES.forEach(Throwing.consumer(
- bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- }
- try (Connection connection2 = connectionFactory2.newConnection();
- Channel channel2 = connection2.createChannel()) {
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
- InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
- channel2.basicConsume(QUEUE, consumer2);
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
-
- assertThat(consumer2.getConsumedMessages())
- .containsOnlyElementsOf(MESSAGES);
- }
-
- assertThat(result)
- .containsOnlyElementsOf(MESSAGES);
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}
@Test
- public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
- ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+ void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- try (Connection connection = connectionFactory1.newConnection();
- Channel channel = connection.createChannel();
- Connection connection2 = connectionFactory2.newConnection();
- Channel channel2 = connection2.createChannel()) {
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
- MESSAGES_AS_BYTES.forEach(Throwing.consumer(
- bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
- InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
- channel2.basicConsume(QUEUE, consumer2);
-
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- assertThat(consumer2.getConsumedMessages())
- .containsOnlyElementsOf(MESSAGES);
- }
+ private byte[] asBytes(String message) {
+ return message.getBytes(StandardCharsets.UTF_8);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org