You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2018/05/31 07:47:40 UTC
[01/17] james-project git commit: JAMES-2334 checkstyle fixes
Repository: james-project
Updated Branches:
refs/heads/master 10dea04b2 -> 0fdb59c41
JAMES-2334 checkstyle fixes
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0fdb59c4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0fdb59c4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0fdb59c4
Branch: refs/heads/master
Commit: 0fdb59c41cb458bea44cf27c05d9b32e80acf3fc
Parents: c7d08bb
Author: Matthieu Baechler <ma...@apache.org>
Authored: Tue May 29 14:37:26 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../james/mailbox/model/QuotaRatioTest.java | 18 +++++++++---------
.../elasticsearch/json/QuotaRatioAsJsonTest.java | 4 ++--
.../MemoryQuotaSearchTestSystemExtension.java | 2 +-
.../james/queue/rabbitmq/InMemoryConsumer.java | 2 +-
4 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
index 182caf6..3940068 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
@@ -27,50 +27,50 @@ import org.junit.jupiter.api.Test;
import nl.jqno.equalsverifier.EqualsVerifier;
-public class QuotaRatioTest {
+class QuotaRatioTest {
- private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize> builder()
+ private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize>builder()
.used(QuotaSize.size(15))
.computedLimit(QuotaSize.size(60))
.build();
- private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount> builder()
+ private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount>builder()
.used(QuotaCount.count(1))
.computedLimit(QuotaCount.count(2))
.build();
@Test
- public void shouldMatchBeanContact() {
+ void shouldMatchBeanContact() {
EqualsVerifier.forClass(QuotaRatio.class)
.allFieldsShouldBeUsed()
.verify();
}
@Test
- public void quotaRatioShouldThrowWhenQuotaSizeIsNull() {
+ void quotaRatioShouldThrowWhenQuotaSizeIsNull() {
assertThatThrownBy(() -> QuotaRatio.from(null, QUOTA_COUNT))
.isInstanceOf(NullPointerException.class);
}
@Test
- public void quotaRatioShouldThrowWhenQuotaCountIsNull() {
+ void quotaRatioShouldThrowWhenQuotaCountIsNull() {
assertThatThrownBy(() -> QuotaRatio.from(QUOTA_SIZE, null))
.isInstanceOf(NullPointerException.class);
}
@Test
- public void quotaSizeShouldReturnTheQuotaSize() {
+ void quotaSizeShouldReturnTheQuotaSize() {
QuotaRatio quotaRatio = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT);
assertThat(quotaRatio.getQuotaSize()).isEqualTo(QUOTA_SIZE);
}
@Test
- public void quotaCountShouldReturnTheQuotaCount() {
+ void quotaCountShouldReturnTheQuotaCount() {
QuotaRatio quotaRatio = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT);
assertThat(quotaRatio.getQuotaCount()).isEqualTo(QUOTA_COUNT);
}
@Test
- public void maxShouldReturnTheMaxRatio() {
+ void maxShouldReturnTheMaxRatio() {
double max = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT)
.max();
http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
index fb16937..e5046ba 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
@@ -33,11 +33,11 @@ import nl.jqno.equalsverifier.EqualsVerifier;
public class QuotaRatioAsJsonTest {
- private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize> builder()
+ private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize>builder()
.used(QuotaSize.size(15))
.computedLimit(QuotaSize.size(60))
.build();
- private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount> builder()
+ private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount>builder()
.used(QuotaCount.count(1))
.computedLimit(QuotaCount.count(2))
.build();
http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
index e6c8f05..12394e7 100644
--- a/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.extension.ParameterResolver;
public class MemoryQuotaSearchTestSystemExtension implements ParameterResolver {
- private static final Runnable NO_AWAIT = () -> {};
+ private static final Runnable NO_AWAIT = () -> { };
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index 6dd29af..211a9ae 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -39,7 +39,7 @@ public class InMemoryConsumer extends DefaultConsumer {
private final Operation operation;
public InMemoryConsumer(Channel channel) {
- this(channel, () -> {});
+ this(channel, () -> { });
}
public InMemoryConsumer(Channel channel, Operation operation) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[10/17] james-project git commit: JAMES-2334 Demonstrate that
workQueue is working
Posted by ma...@apache.org.
JAMES-2334 Demonstrate that workQueue is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b07fd7e0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b07fd7e0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b07fd7e0
Branch: refs/heads/master
Commit: b07fd7e0abdc60a37f41f26e8daa152d49adf596
Parents: 9f78f0d
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 10:28:01 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../james/queue/rabbitmq/InMemoryConsumer.java | 49 ++++++
.../james/queue/rabbitmq/RabbitMQFixture.java | 3 +
.../james/queue/rabbitmq/RabbitMQTest.java | 154 +++++++++++++------
3 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
new file mode 100644
index 0000000..f5f8fa2
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+
+public class InMemoryConsumer extends DefaultConsumer {
+
+ private final ConcurrentLinkedQueue<Integer> messages;
+
+ public InMemoryConsumer(Channel channel) {
+ super(channel);
+ messages = new ConcurrentLinkedQueue<>();
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
+ messages.add(payload);
+ }
+
+ public Queue<Integer> getConsumedMessages() {
+ return messages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
index e216690..3ed6237 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
@@ -34,6 +34,9 @@ public class RabbitMQFixture {
public static final String EXCHANGE_NAME = "exchangeName";
public static final String ROUTING_KEY = "routingKey";
public static final String DIRECT = "direct";
+ public static final boolean EXCLUSIVE = true;
+ public static final boolean AUTO_DELETE = true;
+ public static final String WORK_QUEUE = "workQueue";
public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS;
public static ConditionFactory calmlyAwait = Awaitility.with()
http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index cad0303..2463516 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -19,34 +19,38 @@
package org.apache.james.queue.rabbitmq;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.WORK_QUEUE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
@ExtendWith(DockerRabbitMQExtension.class)
class RabbitMQTest {
@@ -87,44 +91,65 @@ class RabbitMQTest {
return false;
}
}
-
}
@Nested
- class BroadcastTest {
+ class FourConnections {
private ConnectionFactory connectionFactory1;
private ConnectionFactory connectionFactory2;
private ConnectionFactory connectionFactory3;
private ConnectionFactory connectionFactory4;
+ private Connection connection1;
+ private Connection connection2;
+ private Connection connection3;
+ private Connection connection4;
+ private Channel publisherChannel;
+ private Channel subscriberChannel2;
+ private Channel subscriberChannel3;
+ private Channel subscriberChannel4;
@BeforeEach
- public void setup(DockerRabbitMQ rabbitMQ) {
+ void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
connectionFactory1 = rabbitMQ.connectionFactory();
connectionFactory2 = rabbitMQ.connectionFactory();
connectionFactory3 = rabbitMQ.connectionFactory();
connectionFactory4 = rabbitMQ.connectionFactory();
+ connection1 = connectionFactory1.newConnection();
+ connection2 = connectionFactory2.newConnection();
+ connection3 = connectionFactory3.newConnection();
+ connection4 = connectionFactory4.newConnection();
+ publisherChannel = connection1.createChannel();
+ subscriberChannel2 = connection2.createChannel();
+ subscriberChannel3 = connection3.createChannel();
+ subscriberChannel4 = connection4.createChannel();
}
- // In the following case, each consumer will receive the messages produced by the
- // producer
- // To do so, each consumer will bind it's queue to the producer exchange.
- @Test
- public void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
- ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
- ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>();
-
- try (Connection connection1 = connectionFactory1.newConnection();
- Channel publisherChannel = connection1.createChannel();
- Connection connection2 = connectionFactory2.newConnection();
- Channel subscriberChannel2 = connection2.createChannel();
- Connection connection3 = connectionFactory3.newConnection();
- Channel subscriberChannel3 = connection3.createChannel();
- Connection connection4 = connectionFactory4.newConnection();
- Channel subscriberChannel4 = connection4.createChannel()) {
+ @AfterEach
+ void tearDown() {
+ closeQuietly(
+ publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4,
+ connection1, connection2, connection3, connection4);
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignoring exception
+ }
+ }
+ }
+
+ @Nested
+ class BroadCast {
+ // In the following case, each consumer will receive the messages produced by the
+ // producer
+ // To do so, each consumer will bind it's queue to the producer exchange.
+ @Test
+ void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
// Declare a single exchange and three queues attached to it.
publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
@@ -135,9 +160,12 @@ class RabbitMQTest {
String queue4 = subscriberChannel4.queueDeclare().getQueue();
subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
- subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2));
- subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3));
- subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4));
+ InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
+ InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
+ InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
+ subscriberChannel2.basicConsume(queue2, consumer2);
+ subscriberChannel3.basicConsume(queue3, consumer3);
+ subscriberChannel4.basicConsume(queue4, consumer4);
// the publisher will produce 10 messages
IntStream.range(0, 10)
@@ -146,30 +174,68 @@ class RabbitMQTest {
.forEach(Throwing.consumer(
bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4));
+ awaitAtMostOneMinute.until(
+ () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
+ ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
// Check every subscriber have receive all the messages.
- assertThat(results2).containsOnlyElementsOf(expectedResult);
- assertThat(results3).containsOnlyElementsOf(expectedResult);
- assertThat(results4).containsOnlyElementsOf(expectedResult);
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}
}
- private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) {
- return Iterables.size(
- Iterables.concat(results2, results3, results4))
- == expectedResult.size() * 3;
+ @Nested
+ class WorkQueue {
+
+ // In the following case, consumers will receive the messages produced by the
+ // producer but will share them.
+ // To do so, we will bind a single queue to the producer exchange.
+ @Test
+ void rabbitMQShouldSupportTheWorkQueueCase() throws Exception {
+ int nbMessages = 100;
+
+ // Declare the exchange and a single queue attached to it.
+ publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+ publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ // Publisher will produce 100 messages
+ IntStream.range(0, nbMessages)
+ .mapToObj(String::valueOf)
+ .map(s -> s.getBytes(StandardCharsets.UTF_8))
+ .forEach(Throwing.consumer(
+ bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
+ InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
+ InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
+ subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
+ subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
+ subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+
+ awaitAtMostOneMinute.until(
+ () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages);
+
+ ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+
+ assertThat(
+ Iterables.concat(
+ consumer2.getConsumedMessages(),
+ consumer3.getConsumedMessages(),
+ consumer4.getConsumedMessages()))
+ .containsOnlyElementsOf(expectedResult);
+ }
+
}
- private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) {
- return new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
- results.add(payload);
- }
- };
+ private long countReceivedMessages(InMemoryConsumer... consumers) {
+ return Arrays.stream(consumers)
+ .map(InMemoryConsumer::getConsumedMessages)
+ .mapToLong(Queue::size)
+ .sum();
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[13/17] james-project git commit: JAMES-2334 Test cross cluster
operations
Posted by ma...@apache.org.
JAMES-2334 Test cross cluster operations
- Pub/sub on different nodes
- Exchange/queue binding on different nodes
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/749e6413
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/749e6413
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/749e6413
Branch: refs/heads/master
Commit: 749e64138816e220b96134389ddd5a03cac188d5
Parents: 4439551
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:20:17 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../queue/rabbitmq/RabbitMQClusterTest.java | 91 ++++++++++++++++++++
1 file changed, 91 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/749e6413/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 8874641..f88ecf4 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -18,15 +18,45 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
@ExtendWith(DockerClusterRabbitMQExtention.class)
class RabbitMQClusterTest {
+ public static final String QUEUE = "queue";
+
+ @AfterEach
+ public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
+ cluster.getRabbitMQ2()
+ .connectionFactory()
+ .newConnection()
+ .createChannel()
+ .queueDelete(QUEUE);
+ }
+
@Test
void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
String stdout = cluster.getRabbitMQ1().container()
@@ -40,4 +70,65 @@ class RabbitMQClusterTest {
DockerClusterRabbitMQExtention.RABBIT_3);
}
+ @Test
+ public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+ ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+
+ ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
+ ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+
+ try (Connection connection = connectionFactory1.newConnection();
+ Channel channel = connection.createChannel()) {
+
+ channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+ bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ }
+
+ try (Connection connection2 = connectionFactory2.newConnection();
+ Channel channel2 = connection2.createChannel()) {
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+ channel2.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+ assertThat(consumer2.getConsumedMessages())
+ .containsOnlyElementsOf(MESSAGES);
+ }
+
+ assertThat(result)
+ .containsOnlyElementsOf(MESSAGES);
+ }
+
+ @Test
+ public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+ ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
+ ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+
+ try (Connection connection = connectionFactory1.newConnection();
+ Channel channel = connection.createChannel();
+ Connection connection2 = connectionFactory2.newConnection();
+ Channel channel2 = connection2.createChannel()) {
+
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+ bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+ channel2.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+ assertThat(consumer2.getConsumedMessages())
+ .containsOnlyElementsOf(MESSAGES);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[16/17] james-project git commit: JAMES-2334 Improve rabbit mq image
logging
Posted by ma...@apache.org.
JAMES-2334 Improve rabbit mq image logging
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c7d08bb7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c7d08bb7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c7d08bb7
Branch: refs/heads/master
Commit: c7d08bb72e0ca8cc214ae5f66841aedad069f511
Parents: 21eec77
Author: benwa <bt...@linagora.com>
Authored: Tue May 29 10:13:44 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../james/queue/rabbitmq/DockerRabbitMQ.java | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/c7d08bb7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index d890c2b..02f89b2 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -112,19 +112,28 @@ public class DockerRabbitMQ {
}
public void join(DockerRabbitMQ rabbitMQ) throws Exception {
- container()
+ stopApp();
+ joinCluster(rabbitMQ);
+ }
+
+ private void stopApp() throws java.io.IOException, InterruptedException {
+ String stdout = container()
.execInContainer("rabbitmqctl", "stop_app")
.getStdout();
- container()
+ LOGGER.debug("stop_app: {}", stdout);
+ }
+
+ private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException {
+ String stdout = container()
.execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
.getStdout();
+ LOGGER.debug("join_cluster: {}", stdout);
}
public void startApp() throws Exception {
String stdout = container()
.execInContainer("rabbitmqctl", "start_app")
.getStdout();
- System.out.println(stdout);
-
+ LOGGER.debug("start_app: {}", stdout);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[11/17] james-project git commit: JAMES-2334 Demonstrate that routing
is working
Posted by ma...@apache.org.
JAMES-2334 Demonstrate that routing is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/94ddf6a0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/94ddf6a0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/94ddf6a0
Branch: refs/heads/master
Commit: 94ddf6a0817b5adcf32666075cc1514ebd96e67e
Parents: b07fd7e
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 14:55:06 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../james/queue/rabbitmq/RabbitMQTest.java | 136 +++++++++++++------
1 file changed, 96 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/94ddf6a0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 2463516..5e8f311 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -55,8 +55,6 @@ import com.rabbitmq.client.ConnectionFactory;
@ExtendWith(DockerRabbitMQExtension.class)
class RabbitMQTest {
- private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
-
@Nested
class SingleConsumerTest {
@@ -81,7 +79,7 @@ class RabbitMQTest {
}
private void publishAMessage(Channel channel) throws IOException {
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!"));
}
private Boolean messageReceived(Channel channel, String queueName) {
@@ -104,10 +102,10 @@ class RabbitMQTest {
private Connection connection2;
private Connection connection3;
private Connection connection4;
- private Channel publisherChannel;
- private Channel subscriberChannel2;
- private Channel subscriberChannel3;
- private Channel subscriberChannel4;
+ private Channel channel1;
+ private Channel channel2;
+ private Channel channel3;
+ private Channel channel4;
@BeforeEach
void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
@@ -119,16 +117,16 @@ class RabbitMQTest {
connection2 = connectionFactory2.newConnection();
connection3 = connectionFactory3.newConnection();
connection4 = connectionFactory4.newConnection();
- publisherChannel = connection1.createChannel();
- subscriberChannel2 = connection2.createChannel();
- subscriberChannel3 = connection3.createChannel();
- subscriberChannel4 = connection4.createChannel();
+ channel1 = connection1.createChannel();
+ channel2 = connection2.createChannel();
+ channel3 = connection3.createChannel();
+ channel4 = connection4.createChannel();
}
@AfterEach
void tearDown() {
closeQuietly(
- publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4,
+ channel1, channel2, channel3, channel4,
connection1, connection2, connection3, connection4);
}
@@ -151,28 +149,28 @@ class RabbitMQTest {
@Test
void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
// Declare a single exchange and three queues attached to it.
- publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-
- String queue2 = subscriberChannel2.queueDeclare().getQueue();
- subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
- String queue3 = subscriberChannel3.queueDeclare().getQueue();
- subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
- String queue4 = subscriberChannel4.queueDeclare().getQueue();
- subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
-
- InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
- InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
- InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
- subscriberChannel2.basicConsume(queue2, consumer2);
- subscriberChannel3.basicConsume(queue3, consumer3);
- subscriberChannel4.basicConsume(queue4, consumer4);
+ channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+ String queue2 = channel2.queueDeclare().getQueue();
+ channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+ String queue3 = channel3.queueDeclare().getQueue();
+ channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+ String queue4 = channel4.queueDeclare().getQueue();
+ channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+ InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+ InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+ channel2.basicConsume(queue2, consumer2);
+ channel3.basicConsume(queue3, consumer3);
+ channel4.basicConsume(queue4, consumer4);
// the publisher will produce 10 messages
IntStream.range(0, 10)
.mapToObj(String::valueOf)
- .map(s -> s.getBytes(StandardCharsets.UTF_8))
+ .map(RabbitMQTest.this::asBytes)
.forEach(Throwing.consumer(
- bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
awaitAtMostOneMinute.until(
() -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
@@ -196,23 +194,23 @@ class RabbitMQTest {
int nbMessages = 100;
// Declare the exchange and a single queue attached to it.
- publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
- publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
- publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+ channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
// Publisher will produce 100 messages
IntStream.range(0, nbMessages)
.mapToObj(String::valueOf)
- .map(s -> s.getBytes(StandardCharsets.UTF_8))
+ .map(RabbitMQTest.this::asBytes)
.forEach(Throwing.consumer(
- bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
- InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
- InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
- subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
- subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
- subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+ InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+ InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+ InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+ channel2.basicConsume(WORK_QUEUE, consumer2);
+ channel3.basicConsume(WORK_QUEUE, consumer3);
+ channel4.basicConsume(WORK_QUEUE, consumer4);
awaitAtMostOneMinute.until(
() -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages);
@@ -229,6 +227,61 @@ class RabbitMQTest {
}
+ @Nested
+ class Routing {
+ @Test
+ void rabbitMQShouldSupportRouting() throws Exception {
+ String conversation1 = "c1";
+ String conversation2 = "c2";
+ String conversation3 = "c3";
+ String conversation4 = "c4";
+
+ // Declare the exchange and a single queue attached to it.
+ channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+ String queue1 = channel1.queueDeclare().getQueue();
+ // 1 will follow conversation 1 and 2
+ channel1.queueBind(queue1, EXCHANGE_NAME, conversation1);
+ channel1.queueBind(queue1, EXCHANGE_NAME, conversation2);
+
+ String queue2 = channel2.queueDeclare().getQueue();
+ // 2 will follow conversation 2 and 3
+ channel2.queueBind(queue2, EXCHANGE_NAME, conversation2);
+ channel2.queueBind(queue2, EXCHANGE_NAME, conversation3);
+
+ String queue3 = channel3.queueDeclare().getQueue();
+ // 3 will follow conversation 3 and 4
+ channel3.queueBind(queue3, EXCHANGE_NAME, conversation3);
+ channel3.queueBind(queue3, EXCHANGE_NAME, conversation4);
+
+ String queue4 = channel4.queueDeclare().getQueue();
+ // 4 will follow conversation 1 and 4
+ channel4.queueBind(queue4, EXCHANGE_NAME, conversation1);
+ channel4.queueBind(queue4, EXCHANGE_NAME, conversation4);
+
+ channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1"));
+ channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2"));
+ channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3"));
+ channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4"));
+
+ InMemoryConsumer consumer1 = new InMemoryConsumer(channel1);
+ InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+ InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+ InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+ channel1.basicConsume(queue1, consumer1);
+ channel2.basicConsume(queue2, consumer2);
+ channel3.basicConsume(queue3, consumer3);
+ channel4.basicConsume(queue4, consumer4);
+
+ awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8);
+
+ assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2);
+ assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3);
+ assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4);
+ assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4);
+ }
+ }
+
private long countReceivedMessages(InMemoryConsumer... consumers) {
return Arrays.stream(consumers)
.map(InMemoryConsumer::getConsumedMessages)
@@ -238,5 +291,8 @@ class RabbitMQTest {
}
+ private byte[] asBytes(String message) {
+ return message.getBytes(StandardCharsets.UTF_8);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[03/17] james-project git commit: JAMES-2334 Introduce a dockerized
rabbitMQ cluster
Posted by ma...@apache.org.
JAMES-2334 Introduce a dockerized rabbitMQ cluster
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/44395515
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/44395515
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/44395515
Branch: refs/heads/master
Commit: 44395515f58cfa7442927f209fd015ed7b6c723b
Parents: 8767d00
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:18:50 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
server/queue/queue-rabbitmq/pom.xml | 5 +
.../DockerClusterRabbitMQExtention.java | 112 +++++++++++++++++++
.../james/queue/rabbitmq/DockerRabbitMQ.java | 53 ++++++++-
.../queue/rabbitmq/DockerRabbitMQExtension.java | 4 +-
.../queue/rabbitmq/RabbitMQClusterTest.java | 43 +++++++
5 files changed, 210 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index d7d8ad7..86bbfa2 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -54,6 +54,11 @@
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
new file mode 100644
index 0000000..fae2016
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
@@ -0,0 +1,112 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.google.common.collect.ImmutableList;
+
+public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+ public static final String RABBIT_1 = "rabbit1";
+ public static final String RABBIT_2 = "rabbit2";
+ public static final String RABBIT_3 = "rabbit3";
+ private DockerRabbitMQCluster cluster;
+ private Network network;
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ String cookie = DigestUtils.sha1Hex("secret cookie here");
+
+ network = Network.NetworkImpl.builder()
+ .enableIpv6(false)
+ .createNetworkCmdModifiers(ImmutableList.of())
+ .build();
+
+ DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
+ DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
+ DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
+
+ rabbitMQ1.start();
+ rabbitMQ2.start();
+ rabbitMQ3.start();
+
+ rabbitMQ2.join(rabbitMQ1);
+ rabbitMQ3.join(rabbitMQ1);
+
+ rabbitMQ2.startApp();
+ rabbitMQ3.startApp();
+
+ cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ cluster.stop();
+ network.close();
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return cluster;
+ }
+
+ public static class DockerRabbitMQCluster {
+
+ private final DockerRabbitMQ rabbitMQ1;
+ private final DockerRabbitMQ rabbitMQ2;
+ private final DockerRabbitMQ rabbitMQ3;
+
+ public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+ this.rabbitMQ1 = rabbitMQ1;
+ this.rabbitMQ2 = rabbitMQ2;
+ this.rabbitMQ3 = rabbitMQ3;
+ }
+
+ public void stop() {
+ rabbitMQ1.stop();
+ rabbitMQ2.stop();
+ rabbitMQ3.stop();
+ }
+
+ public DockerRabbitMQ getRabbitMQ1() {
+ return rabbitMQ1;
+ }
+
+ public DockerRabbitMQ getRabbitMQ2() {
+ return rabbitMQ2;
+ }
+
+ public DockerRabbitMQ getRabbitMQ3() {
+ return rabbitMQ3;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 8fea5b0..a964b99 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,30 +18,50 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;
+import java.util.Optional;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
import com.rabbitmq.client.ConnectionFactory;
public class DockerRabbitMQ {
private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
+ private static final String DEFAULT_RABBIT_NODE = "my-rabbit";
private static final int DEFAULT_RABBITMQ_PORT = 5672;
- private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
+ private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE";
+ private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME";
+
+ private final GenericContainer<?> container;
+ private final Optional<String> nodeName;
- private GenericContainer<?> container;
+ public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) {
+ return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName),
+ Optional.of(network));
+ }
+
+ public static DockerRabbitMQ withoutCookie() {
+ return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+ }
@SuppressWarnings("resource")
- public DockerRabbitMQ() {
+ private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
container = new GenericContainer<>("rabbitmq:3.7.5")
- .withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
+ .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse("localhost")))
+ .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
.withExposedPorts(DEFAULT_RABBITMQ_PORT)
.waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this))
.withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()));
+ net.ifPresent(container::withNetwork);
+ erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie));
+ nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name));
+ this.nodeName = nodeName;
}
public String getHostIp() {
@@ -81,4 +101,29 @@ public class DockerRabbitMQ {
DockerClientFactory.instance().client()
.restartContainerCmd(container.getContainerId());
}
+
+ public GenericContainer<?> container() {
+ return container;
+ }
+
+ public String node() {
+ return nodeName.get();
+ }
+
+ public void join(DockerRabbitMQ rabbitMQ) throws Exception {
+ container()
+ .execInContainer("rabbitmqctl", "stop_app")
+ .getStdout();
+ container()
+ .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
+ .getStdout();
+ }
+
+ public void startApp() throws Exception {
+ String stdout = container()
+ .execInContainer("rabbitmqctl", "start_app")
+ .getStdout();
+ System.out.println(stdout);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
index 925ff08..b0cbfd7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
@@ -18,9 +18,7 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;
-import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
@@ -33,7 +31,7 @@ public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCal
@Override
public void beforeEach(ExtensionContext context) {
- rabbitMQ = new DockerRabbitMQ();
+ rabbitMQ = DockerRabbitMQ.withoutCookie();
rabbitMQ.start();
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
new file mode 100644
index 0000000..8874641
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(DockerClusterRabbitMQExtention.class)
+class RabbitMQClusterTest {
+
+ @Test
+ void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+ String stdout = cluster.getRabbitMQ1().container()
+ .execInContainer("rabbitmqctl", "cluster_status")
+ .getStdout();
+
+ assertThat(stdout)
+ .contains(
+ DockerClusterRabbitMQExtention.RABBIT_1,
+ DockerClusterRabbitMQExtention.RABBIT_2,
+ DockerClusterRabbitMQExtention.RABBIT_3);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[12/17] james-project git commit: JAMES-2334 Demonstrate that
durability is working
Posted by ma...@apache.org.
JAMES-2334 Demonstrate that durability is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/fb67d5f2
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/fb67d5f2
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/fb67d5f2
Branch: refs/heads/master
Commit: fb67d5f2cebda481b2ff659d90a764e64133bc28
Parents: 94ddf6a
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:00:10 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../james/queue/rabbitmq/DockerRabbitMQ.java | 6 ++
.../james/queue/rabbitmq/RabbitMQTest.java | 66 +++++++++++++++-----
2 files changed, 55 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/fb67d5f2/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 0eab7a2..8c7bae8 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;
+import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import com.rabbitmq.client.ConnectionFactory;
@@ -71,4 +72,9 @@ public class DockerRabbitMQ {
public void stop() {
container.stop();
}
+
+ public void restart() {
+ DockerClientFactory.instance().client()
+ .restartContainerCmd(container.getContainerId());
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/fb67d5f2/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 5e8f311..b81035e 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -58,16 +58,46 @@ class RabbitMQTest {
@Nested
class SingleConsumerTest {
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
+ connectionFactory = rabbitMQ.connectionFactory();
+ connection = connectionFactory.newConnection();
+ channel = connection.createChannel();
+ }
+
+ @AfterEach
+ void tearDown() {
+ closeQuietly(connection, channel);
+ }
+
+ @Test
+ void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception {
+ String queueName = createQueue(channel);
+ publishAMessage(channel);
+ awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+ }
+
@Test
- void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
- ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
- try (Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel()) {
- String queueName = createQueue(channel);
+ void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception {
+ String queueName = createQueue(channel);
+ publishAMessage(channel);
- publishAMessage(channel);
+ rabbitMQ.restart();
- awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+ awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ));
+ assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull();
+ }
+
+ private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) {
+ try {
+ rabbitMQ.connectionFactory().newConnection();
+ return true;
+ } catch (Exception e) {
+ return false;
}
}
@@ -130,16 +160,6 @@ class RabbitMQTest {
connection1, connection2, connection3, connection4);
}
- private void closeQuietly(AutoCloseable... closeables) {
- for (AutoCloseable closeable : closeables) {
- try {
- closeable.close();
- } catch (Exception e) {
- //ignoring exception
- }
- }
- }
-
@Nested
class BroadCast {
@@ -291,6 +311,18 @@ class RabbitMQTest {
}
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
+ }
+
private byte[] asBytes(String message) {
return message.getBytes(StandardCharsets.UTF_8);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[04/17] james-project git commit: JAMES-2334 Introduce RabbitMQ
implementation
Posted by ma...@apache.org.
JAMES-2334 Introduce RabbitMQ implementation
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/5041a0e6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/5041a0e6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/5041a0e6
Branch: refs/heads/master
Commit: 5041a0e65a19eefa8767ad53d94ac6ffa7c1fcd0
Parents: 61e94c6
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 14:31:23 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
pom.xml | 5 +++++
server/pom.xml | 3 ++-
server/queue/queue-rabbitmq/pom.xml | 37 ++++++++++++++++++++++++++++++++
3 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0be0c28..91dfb14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1444,6 +1444,11 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>james-server-queue-rabbitmq</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>james-server-queue-api</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 56545e3..f920a73 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -98,12 +98,13 @@
<module>protocols/protocols-smtp</module>
<module>protocols/webadmin-integration-test</module>
<module>protocols/webadmin</module>
- <module>queue/queue-activemq</module>
+ <module>queue/queue-activemq</module>
<module>queue/queue-api</module>
<module>queue/queue-file</module>
<module>queue/queue-jms</module>
<module>queue/queue-memory</module>
+ <module>queue/queue-rabbitmq</module>
<module>task</module>
<module>testing</module>
http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
new file mode 100644
index 0000000..950089f
--- /dev/null
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-server</artifactId>
+ <version>3.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>james-server-queue-rabbitmq</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache James :: Server :: Mail Queue :: RabbitMQ</name>
+
+ <dependencies>
+ </dependencies>
+</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[05/17] james-project git commit: JAMES-2334 upgrade testcontainers
Posted by ma...@apache.org.
JAMES-2334 upgrade testcontainers
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8d568fac
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8d568fac
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8d568fac
Branch: refs/heads/master
Commit: 8d568fac49514fb308041150acdc27062e11e11f
Parents: 10dea04
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 11:21:28 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../cassandra/CassandraWaitStrategy.java | 19 +++++++++++--------
.../backends/cassandra/DockerCassandraRule.java | 5 +++--
.../apache/james/mailbox/tika/TikaContainer.java | 2 +-
pom.xml | 2 +-
.../james/util/docker/SwarmGenericContainer.java | 2 +-
.../util/scanner/SpamAssassinExtension.java | 2 +-
.../util/scanner/SpamAssassinWaitStrategy.java | 17 ++++++++++-------
.../james/user/ldap/LdapGenericContainer.java | 2 +-
.../transport/mailets/SpamAssassinTest.java | 2 +-
.../org/apache/james/jmap/ContainerTest.java | 2 +-
.../java/org/apache/james/utils/FakeSmtp.java | 2 +-
11 files changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
index d6f71ff..bb47552 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
@@ -25,28 +25,31 @@ import java.util.concurrent.TimeUnit;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import com.google.common.primitives.Ints;
public class CassandraWaitStrategy implements WaitStrategy {
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
- private Duration timeout = DEFAULT_TIMEOUT;
+ private final GenericContainer<?> cassandraContainer;
+ private final Duration timeout;
- public CassandraWaitStrategy() {
- this(DEFAULT_TIMEOUT);
+ public CassandraWaitStrategy(GenericContainer<?> cassandraContainer) {
+ this(cassandraContainer, DEFAULT_TIMEOUT);
}
- public CassandraWaitStrategy(Duration timeout) {
+ public CassandraWaitStrategy(GenericContainer<?> cassandraContainer, Duration timeout) {
+ this.cassandraContainer = cassandraContainer;
this.timeout = timeout;
}
@Override
- public void waitUntilReady(@SuppressWarnings("rawtypes") GenericContainer container) {
+ public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
Unreliables.retryUntilTrue(Ints.checkedCast(timeout.getSeconds()), TimeUnit.SECONDS, () -> {
try {
- return container
+ return cassandraContainer
.execInContainer("cqlsh", "-e", "show host")
.getStdout()
.contains("Connected to Test Cluster");
@@ -59,6 +62,6 @@ public class CassandraWaitStrategy implements WaitStrategy {
@Override
public WaitStrategy withStartupTimeout(Duration startupTimeout) {
- return new CassandraWaitStrategy(startupTimeout);
+ return new CassandraWaitStrategy(cassandraContainer, startupTimeout);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
index 126d31f..327fa78 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
@@ -89,8 +89,9 @@ public class DockerCassandraRule implements TestRule {
.withCreateContainerCmdModifier(cmd -> cmd.getHostConfig().withBinds(new Binds(new Bind(tmpFsName, new Volume("/var/lib/cassandra")))))
.withCreateContainerCmdModifier(cmd -> cmd.withMemory(2000 * 1024 * 1024L))
.withExposedPorts(CASSANDRA_PORT)
- .withLogConsumer(this::displayDockerLog)
- .waitingFor(new CassandraWaitStrategy());
+ .withLogConsumer(this::displayDockerLog);
+ cassandraContainer
+ .waitingFor(new CassandraWaitStrategy(cassandraContainer));
}
private void displayDockerLog(OutputFrame outputFrame) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
----------------------------------------------------------------------
diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
index 8bb7733..ec6b776 100644
--- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
+++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.james.util.docker.Images;
import org.apache.james.util.docker.SwarmGenericContainer;
import org.junit.rules.ExternalResource;
-import org.testcontainers.containers.wait.Wait;
+import org.testcontainers.containers.wait.strategy.Wait;
import com.google.common.primitives.Ints;
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fd8a14..75c3a58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -642,7 +642,7 @@
<jetty.version>9.4.7.v20170914</jetty.version>
<cassandra-unit.version>2.1.9.2</cassandra-unit.version>
<assertj-guava.version>3.1.0</assertj-guava.version>
- <testcontainers-version>1.4.2</testcontainers-version>
+ <testcontainers-version>1.7.3</testcontainers-version>
<metrics.version>3.2.1</metrics.version>
<joda.version>2.9.4</joda.version>
<assertj.version>3.3.0</assertj.version>
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java b/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
index 20c66dd..e9fd984 100644
--- a/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
+++ b/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
index 68bbfe4..46100bc 100644
--- a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
+++ b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
@@ -52,7 +52,7 @@ public class SpamAssassinExtension implements BeforeEachCallback, AfterEachCallb
.withFileFromClasspath("spamd.sh", "docker/spamassassin/spamd.sh")
.withFileFromClasspath("rule-update.sh", "docker/spamassassin/rule-update.sh")
.withFileFromClasspath("bayes_pg.sql", "docker/spamassassin/bayes_pg.sql"));
- spamAssassinContainer.waitingFor(new SpamAssassinWaitStrategy());
+ spamAssassinContainer.waitingFor(new SpamAssassinWaitStrategy(spamAssassinContainer));
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
index 108d0d4..1341256 100644
--- a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
+++ b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
@@ -25,28 +25,31 @@ import java.util.concurrent.TimeUnit;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
import com.google.common.primitives.Ints;
public class SpamAssassinWaitStrategy implements WaitStrategy {
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
+ private final GenericContainer<?> spamAssassinContainer;
private Duration timeout = DEFAULT_TIMEOUT;
- public SpamAssassinWaitStrategy() {
- this(DEFAULT_TIMEOUT);
+ public SpamAssassinWaitStrategy(GenericContainer<?> spamAssassinContainer) {
+ this(spamAssassinContainer, DEFAULT_TIMEOUT);
}
- public SpamAssassinWaitStrategy(Duration timeout) {
+ public SpamAssassinWaitStrategy(GenericContainer<?> spamAssassinContainer, Duration timeout) {
+ this.spamAssassinContainer = spamAssassinContainer;
this.timeout = timeout;
}
@Override
- public void waitUntilReady(@SuppressWarnings("rawtypes") GenericContainer container) {
+ public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
Unreliables.retryUntilTrue(Ints.checkedCast(timeout.getSeconds()), TimeUnit.SECONDS, () -> {
try {
- return container
+ return spamAssassinContainer
.execInContainer("spamassassin", "-V")
.getStdout()
.contains("SpamAssassin version 3.4.1");
@@ -59,6 +62,6 @@ public class SpamAssassinWaitStrategy implements WaitStrategy {
@Override
public WaitStrategy withStartupTimeout(Duration startupTimeout) {
- return new SpamAssassinWaitStrategy(startupTimeout);
+ return new SpamAssassinWaitStrategy(spamAssassinContainer, startupTimeout);
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
----------------------------------------------------------------------
diff --git a/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java b/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
index 4a6fb61..992b4bf 100644
--- a/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
+++ b/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
@@ -20,7 +20,7 @@ package org.apache.james.user.ldap;
import org.apache.james.util.docker.SwarmGenericContainer;
import org.junit.rules.ExternalResource;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
----------------------------------------------------------------------
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
index 8ed9729..274b0e8 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
@@ -51,7 +51,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
public class SpamAssassinTest {
private static final String SPAM_CONTENT = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X";
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
index bcc8a18..6e3d02b 100644
--- a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
+++ b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
@@ -30,7 +30,7 @@ import org.apache.james.util.docker.Images;
import org.apache.james.util.docker.SwarmGenericContainer;
import org.junit.Rule;
import org.junit.Test;
-import org.testcontainers.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
public class ContainerTest {
http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
----------------------------------------------------------------------
diff --git a/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java b/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
index a0694d2..4b5fc3a 100644
--- a/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
+++ b/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
@@ -31,7 +31,7 @@ import org.apache.james.util.docker.SwarmGenericContainer;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import com.jayway.awaitility.core.ConditionFactory;
import com.jayway.restassured.builder.RequestSpecBuilder;
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[17/17] james-project git commit: JAMES-2334 Add node killing tests
for RabbitMQ cluster
Posted by ma...@apache.org.
JAMES-2334 Add node killing tests for RabbitMQ cluster
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/552e44d0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/552e44d0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/552e44d0
Branch: refs/heads/master
Commit: 552e44d073580b2565d163e007b3a3144c671bcc
Parents: 00adfa8
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:50:27 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../DockerClusterRabbitMQExtension.java | 120 ++++++++
.../DockerClusterRabbitMQExtention.java | 112 --------
.../james/queue/rabbitmq/InMemoryConsumer.java | 14 +-
.../queue/rabbitmq/RabbitMQClusterTest.java | 281 ++++++++++++++-----
4 files changed, 345 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
new file mode 100644
index 0000000..000fef6
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
@@ -0,0 +1,120 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.google.common.collect.ImmutableList;
+import com.rabbitmq.client.Address;
+
+public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+ public static final String RABBIT_1 = "rabbit1";
+ public static final String RABBIT_2 = "rabbit2";
+ public static final String RABBIT_3 = "rabbit3";
+ private DockerRabbitMQCluster cluster;
+ private Network network;
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ String cookie = DigestUtils.sha1Hex("secret cookie here");
+
+ network = Network.NetworkImpl.builder()
+ .enableIpv6(false)
+ .createNetworkCmdModifiers(ImmutableList.of())
+ .build();
+
+ DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
+ DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
+ DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
+
+ rabbitMQ1.start();
+ rabbitMQ2.start();
+ rabbitMQ3.start();
+
+ rabbitMQ2.join(rabbitMQ1);
+ rabbitMQ3.join(rabbitMQ1);
+
+ rabbitMQ2.startApp();
+ rabbitMQ3.startApp();
+
+ cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ cluster.stop();
+ network.close();
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return cluster;
+ }
+
+ public static class DockerRabbitMQCluster {
+
+ private final DockerRabbitMQ rabbitMQ1;
+ private final DockerRabbitMQ rabbitMQ2;
+ private final DockerRabbitMQ rabbitMQ3;
+
+ public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+ this.rabbitMQ1 = rabbitMQ1;
+ this.rabbitMQ2 = rabbitMQ2;
+ this.rabbitMQ3 = rabbitMQ3;
+ }
+
+ public void stop() {
+ rabbitMQ1.stop();
+ rabbitMQ2.stop();
+ rabbitMQ3.stop();
+ }
+
+ public DockerRabbitMQ getRabbitMQ1() {
+ return rabbitMQ1;
+ }
+
+ public DockerRabbitMQ getRabbitMQ2() {
+ return rabbitMQ2;
+ }
+
+ public DockerRabbitMQ getRabbitMQ3() {
+ return rabbitMQ3;
+ }
+
+ public ImmutableList<Address> getAddresses() {
+ return ImmutableList.of(
+ new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
+ new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
+ new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
deleted file mode 100644
index fae2016..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.testcontainers.containers.Network;
-
-import com.google.common.collect.ImmutableList;
-
-public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
-
- public static final String RABBIT_1 = "rabbit1";
- public static final String RABBIT_2 = "rabbit2";
- public static final String RABBIT_3 = "rabbit3";
- private DockerRabbitMQCluster cluster;
- private Network network;
-
- @Override
- public void beforeEach(ExtensionContext context) throws Exception {
- String cookie = DigestUtils.sha1Hex("secret cookie here");
-
- network = Network.NetworkImpl.builder()
- .enableIpv6(false)
- .createNetworkCmdModifiers(ImmutableList.of())
- .build();
-
- DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
- DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
- DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
-
- rabbitMQ1.start();
- rabbitMQ2.start();
- rabbitMQ3.start();
-
- rabbitMQ2.join(rabbitMQ1);
- rabbitMQ3.join(rabbitMQ1);
-
- rabbitMQ2.startApp();
- rabbitMQ3.startApp();
-
- cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
- }
-
- @Override
- public void afterEach(ExtensionContext context) throws Exception {
- cluster.stop();
- network.close();
- }
-
- @Override
- public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
- return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
- }
-
- @Override
- public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
- return cluster;
- }
-
- public static class DockerRabbitMQCluster {
-
- private final DockerRabbitMQ rabbitMQ1;
- private final DockerRabbitMQ rabbitMQ2;
- private final DockerRabbitMQ rabbitMQ3;
-
- public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
- this.rabbitMQ1 = rabbitMQ1;
- this.rabbitMQ2 = rabbitMQ2;
- this.rabbitMQ3 = rabbitMQ3;
- }
-
- public void stop() {
- rabbitMQ1.stop();
- rabbitMQ2.stop();
- rabbitMQ3.stop();
- }
-
- public DockerRabbitMQ getRabbitMQ1() {
- return rabbitMQ1;
- }
-
- public DockerRabbitMQ getRabbitMQ2() {
- return rabbitMQ2;
- }
-
- public DockerRabbitMQ getRabbitMQ3() {
- return rabbitMQ3;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index f5f8fa2..6dd29af 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -30,15 +30,27 @@ import com.rabbitmq.client.Envelope;
public class InMemoryConsumer extends DefaultConsumer {
+ @FunctionalInterface
+ interface Operation {
+ void perform();
+ }
+
private final ConcurrentLinkedQueue<Integer> messages;
+ private final Operation operation;
public InMemoryConsumer(Channel channel) {
+ this(channel, () -> {});
+ }
+
+ public InMemoryConsumer(Channel channel, Operation operation) {
super(channel);
- messages = new ConcurrentLinkedQueue<>();
+ this.operation = operation;
+ this.messages = new ConcurrentLinkedQueue<>();
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ operation.perform();
Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
messages.add(payload);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 4a7dd07..d3f2cc1 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -30,116 +30,259 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-@ExtendWith(DockerClusterRabbitMQExtention.class)
+@ExtendWith(DockerClusterRabbitMQExtension.class)
class RabbitMQClusterTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);
+
private static final String QUEUE = "queue";
- private Connection node1Connection;
- private Channel node1Channel;
- private Connection node2Connection;
- private Channel node2Channel;
-
- @BeforeEach
- void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
- ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
- node1Connection = node1ConnectionFactory.newConnection();
- node2Connection = node2ConnectionFactory.newConnection();
- node1Channel = node1Connection.createChannel();
- node2Channel = node2Connection.createChannel();
- }
+ @Nested
+ class ClusterSharing {
+
+ private ConnectionFactory node1ConnectionFactory;
+ private ConnectionFactory node2ConnectionFactory;
+ private Connection node1Connection;
+ private Connection node2Connection;
+ private Channel node1Channel;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node1Connection = node1ConnectionFactory.newConnection();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node1Channel = node1Connection.createChannel();
+ node2Channel = node2Connection.createChannel();
+ }
+
+ @AfterEach
+ void tearDown() {
+ closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+ }
+
+ @Test
+ void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+ String stdout = cluster.getRabbitMQ1().container()
+ .execInContainer("rabbitmqctl", "cluster_status")
+ .getStdout();
+
+ assertThat(stdout)
+ .contains(
+ DockerClusterRabbitMQExtension.RABBIT_1,
+ DockerClusterRabbitMQExtension.RABBIT_2,
+ DockerClusterRabbitMQExtension.RABBIT_3);
+ }
+
+ @Test
+ void queuesShouldBeShared() throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ @Test
+ void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
+
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- @AfterEach
- void tearDown() {
- closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
}
- private void closeQuietly(AutoCloseable... closeables) {
- for (AutoCloseable closeable : closeables) {
+ @Nested
+ class ClusterNodesFailure {
+
+ private ConnectionFactory node1ConnectionFactory;
+ private Connection resilientConnection;
+ private Channel resilientChannel;
+ private Connection node2Connection;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses());
+ resilientChannel = resilientConnection.createChannel();
+ ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node2Channel = node2Connection.createChannel();
+ }
+
+ @AfterEach
+ void tearDown() {
+ closeQuietly(resilientConnection, resilientChannel);
+ }
+
+ @Disabled("For some reason, we are unable to recover topology when reconnecting")
+ @Test
+ void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception {
+ resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ int nbMessages = 20;
+ int firstBatchSize = nbMessages / 2;
+ IntStream.range(0, firstBatchSize)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize);
+
+ cluster.getRabbitMQ1().stop();
+
+ IntStream.range(firstBatchSize, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(this::tryPublishWithRetry);
+
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ private void tryPublishWithRetry(byte[] bytes) {
+ Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes));
+ }
+
+ private boolean tryPublish(byte[] bytes) {
try {
- closeable.close();
+ resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes);
+ return true;
} catch (Exception e) {
- //ignoring exception
+ LOGGER.error("failed publish", e);
+ return false;
}
}
- }
- @Test
- void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
- String stdout = cluster.getRabbitMQ1().container()
- .execInContainer("rabbitmqctl", "cluster_status")
- .getStdout();
-
- assertThat(stdout)
- .contains(
- DockerClusterRabbitMQExtention.RABBIT_1,
- DockerClusterRabbitMQExtention.RABBIT_2,
- DockerClusterRabbitMQExtention.RABBIT_3);
- }
+ @Test
+ void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception {
+ ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory();
+ cluster.getRabbitMQ3().stop();
- @Test
- void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
- node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses());
+ Channel channel = connection.createChannel()) {
- int nbMessages = 10;
- IntStream.range(0, nbMessages)
- .mapToObj(i -> asBytes(String.valueOf(i)))
- .forEach(Throwing.consumer(
- bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
- node2Channel.basicConsume(QUEUE, consumer2);
+ InMemoryConsumer consumer = new InMemoryConsumer(channel);
+ channel.basicConsume(QUEUE, consumer);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
- List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
- assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
- }
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+ }
- @Test
- void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
- node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ @Test
+ void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception {
+ node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- int nbMessages = 10;
- IntStream.range(0, nbMessages)
- .mapToObj(i -> asBytes(String.valueOf(i)))
- .forEach(Throwing.consumer(
- bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
- node2Channel.basicConsume(QUEUE, consumer2);
+ AtomicInteger counter = new AtomicInteger(0);
+ InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
+ () -> {
+ if (counter.incrementAndGet() == nbMessages / 2) {
+ cluster.getRabbitMQ1().stop();
+ }
+ });
+ resilientChannel.basicConsume(QUEUE, consumer);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+ awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+ assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
- assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
}
private byte[] asBytes(String message) {
return message.getBytes(StandardCharsets.UTF_8);
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[14/17] james-project git commit: JAMES-2334 Adding logs to rabbitMQ
docker
Posted by ma...@apache.org.
JAMES-2334 Adding logs to rabbitMQ docker
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8767d00b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8767d00b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8767d00b
Branch: refs/heads/master
Commit: 8767d00bfb5a53a0e71e6f1f51b57d473ffb7e92
Parents: fb67d5f
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 11:03:40 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
server/queue/queue-rabbitmq/pom.xml | 9 ++++++++
.../james/queue/rabbitmq/DockerRabbitMQ.java | 6 ++++-
.../src/test/resources/logback-test.xml | 23 ++++++++++++++++++++
3 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 3df0fb6..d7d8ad7 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -78,5 +78,14 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 8c7bae8..8fea5b0 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,12 +18,15 @@
****************************************************************/
package org.apache.james.queue.rabbitmq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import com.rabbitmq.client.ConnectionFactory;
public class DockerRabbitMQ {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
private static final int DEFAULT_RABBITMQ_PORT = 5672;
private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
@@ -37,7 +40,8 @@ public class DockerRabbitMQ {
container = new GenericContainer<>("rabbitmq:3.7.5")
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
.withExposedPorts(DEFAULT_RABBITMQ_PORT)
- .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this));
+ .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this))
+ .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()));
}
public String getHostIp() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..fa5a922
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+ <resetJUL>true</resetJUL>
+ </contextListener>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
+ <immediateFlush>false</immediateFlush>
+ </encoder>
+ </appender>
+
+ <root level="ERROR">
+ <appender-ref ref="CONSOLE" />
+ </root>
+
+ <logger name="org.testcontainers" level="ERROR"/>
+ <logger name="org.apache.james" level="DEBUG"/>
+ <logger name="org.apache.james.queue.rabbitmq.DockerRabbitMQ" level="DEBUG"/>
+
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[02/17] james-project git commit: JAMES-2334 Enforce common image
usage for RabbitMQ
Posted by ma...@apache.org.
JAMES-2334 Enforce common image usage for RabbitMQ
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/21eec770
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/21eec770
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/21eec770
Branch: refs/heads/master
Commit: 21eec7701198a232ed9bd17b55a88f4dac083dbb
Parents: 552e44d
Author: benwa <bt...@linagora.com>
Authored: Tue May 29 10:10:50 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../src/test/java/org/apache/james/util/docker/Images.java | 2 +-
server/queue/queue-rabbitmq/pom.xml | 6 ++++++
.../java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java | 3 ++-
3 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java b/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
index 7b75b45..549cf02 100644
--- a/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
+++ b/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
@@ -21,7 +21,7 @@ package org.apache.james.util.docker;
public interface Images {
String FAKE_SMTP = "weave/rest-smtp-sink:latest";
- String RABBITMQ = "rabbitmq:3";
+ String RABBITMQ = "rabbitmq:3.7.5";
String ELASTICSEARCH = "elasticsearch:2.2.2";
String NGINX = "nginx:1.7.1";
String TIKA = "logicalspark/docker-tikaserver:1.15rc2";
http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 86bbfa2..c2fff5b 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -34,6 +34,12 @@
<dependencies>
<dependency>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-server-util-java8</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.github.fge</groupId>
<artifactId>throwing-lambdas</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index a964b99..d890c2b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -20,6 +20,7 @@ package org.apache.james.queue.rabbitmq;
import java.util.Optional;
+import org.apache.james.util.docker.Images;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
@@ -52,7 +53,7 @@ public class DockerRabbitMQ {
@SuppressWarnings("resource")
private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
- container = new GenericContainer<>("rabbitmq:3.7.5")
+ container = new GenericContainer<>(Images.RABBITMQ)
.withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse("localhost")))
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
.withExposedPorts(DEFAULT_RABBITMQ_PORT)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[07/17] james-project git commit: JAMES-2334 Upgrade amqp client
version
Posted by ma...@apache.org.
JAMES-2334 Upgrade amqp client version
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/61e94c6a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/61e94c6a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/61e94c6a
Branch: refs/heads/master
Commit: 61e94c6aae7b87d2c663154d0137930e2044c4cc
Parents: 8d568fa
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:16:17 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/61e94c6a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75c3a58..0be0c28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1704,7 +1704,7 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
- <version>4.0.0</version>
+ <version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.sparkjava</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[09/17] james-project git commit: JAMES-2334 Demonstrate that
broadcast is working
Posted by ma...@apache.org.
JAMES-2334 Demonstrate that broadcast is working
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9f78f0d7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9f78f0d7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9f78f0d7
Branch: refs/heads/master
Commit: 9f78f0d75f5690b6680b44972423966629a3296a
Parents: 30679c4
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 10:01:45 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
server/queue/queue-rabbitmq/pom.xml | 8 +
.../james/queue/rabbitmq/RabbitMQTest.java | 145 ++++++++++++++++---
2 files changed, 131 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 27b0c84..3df0fb6 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -34,6 +34,14 @@
<dependencies>
<dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>throwing-lambdas</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.steveash.guavate</groupId>
+ <artifactId>guavate</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 465be65..cad0303 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -25,51 +25,152 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
@ExtendWith(DockerRabbitMQExtension.class)
class RabbitMQTest {
private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
- @Test
- void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
- ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
- try (Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel()) {
- String queueName = createQueue(channel);
+ @Nested
+ class SingleConsumerTest {
- publishAMessage(channel);
+ @Test
+ void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
+ ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+ try (Connection connection = connectionFactory.newConnection();
+ Channel channel = connection.createChannel()) {
+ String queueName = createQueue(channel);
- awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+ publishAMessage(channel);
+
+ awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+ }
}
- }
- private String createQueue(Channel channel) throws IOException {
- channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
- return queueName;
- }
+ private String createQueue(Channel channel) throws IOException {
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ String queueName = channel.queueDeclare().getQueue();
+ channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
+ return queueName;
+ }
+
+ private void publishAMessage(Channel channel) throws IOException {
+ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+ }
+
+ private Boolean messageReceived(Channel channel, String queueName) {
+ try {
+ return channel.basicGet(queueName, !AUTO_ACK) != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
- private void publishAMessage(Channel channel) throws IOException {
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
}
- private Boolean messageReceived(Channel channel, String queueName) {
- try {
- return channel.basicGet(queueName, !AUTO_ACK) != null;
- } catch (Exception e) {
- return false;
+ @Nested
+ class BroadcastTest {
+
+ private ConnectionFactory connectionFactory1;
+ private ConnectionFactory connectionFactory2;
+ private ConnectionFactory connectionFactory3;
+ private ConnectionFactory connectionFactory4;
+
+ @BeforeEach
+ public void setup(DockerRabbitMQ rabbitMQ) {
+ connectionFactory1 = rabbitMQ.connectionFactory();
+ connectionFactory2 = rabbitMQ.connectionFactory();
+ connectionFactory3 = rabbitMQ.connectionFactory();
+ connectionFactory4 = rabbitMQ.connectionFactory();
+ }
+
+ // In the following case, each consumer will receive the messages produced by the
+ // producer
+ // To do so, each consumer will bind it's queue to the producer exchange.
+ @Test
+ public void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
+ ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
+ ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>();
+
+ try (Connection connection1 = connectionFactory1.newConnection();
+ Channel publisherChannel = connection1.createChannel();
+ Connection connection2 = connectionFactory2.newConnection();
+ Channel subscriberChannel2 = connection2.createChannel();
+ Connection connection3 = connectionFactory3.newConnection();
+ Channel subscriberChannel3 = connection3.createChannel();
+ Connection connection4 = connectionFactory4.newConnection();
+ Channel subscriberChannel4 = connection4.createChannel()) {
+
+ // Declare a single exchange and three queues attached to it.
+ publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+ String queue2 = subscriberChannel2.queueDeclare().getQueue();
+ subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+ String queue3 = subscriberChannel3.queueDeclare().getQueue();
+ subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+ String queue4 = subscriberChannel4.queueDeclare().getQueue();
+ subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+ subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2));
+ subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3));
+ subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4));
+
+ // the publisher will produce 10 messages
+ IntStream.range(0, 10)
+ .mapToObj(String::valueOf)
+ .map(s -> s.getBytes(StandardCharsets.UTF_8))
+ .forEach(Throwing.consumer(
+ bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+ awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4));
+
+ // Check every subscriber have receive all the messages.
+ assertThat(results2).containsOnlyElementsOf(expectedResult);
+ assertThat(results3).containsOnlyElementsOf(expectedResult);
+ assertThat(results4).containsOnlyElementsOf(expectedResult);
+ }
+ }
+
+ private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) {
+ return Iterables.size(
+ Iterables.concat(results2, results3, results4))
+ == expectedResult.size() * 3;
+ }
+
+ private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) {
+ return new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
+ results.add(payload);
+ }
+ };
}
}
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[06/17] james-project git commit: JAMES-2334 Provide a JUnit
extension for RabbitMQ in Docker
Posted by ma...@apache.org.
JAMES-2334 Provide a JUnit extension for RabbitMQ in Docker
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8daf8951
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8daf8951
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8daf8951
Branch: refs/heads/master
Commit: 8daf8951bfaeb1e23a8e2bfda749d516799a2edd
Parents: 5041a0e
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:14:57 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
server/queue/queue-rabbitmq/pom.xml | 33 +++++++++
.../james/queue/rabbitmq/DockerRabbitMQ.java | 74 ++++++++++++++++++++
.../queue/rabbitmq/DockerRabbitMQExtension.java | 55 +++++++++++++++
.../rabbitmq/DockerRabbitMQExtensionTest.java | 50 +++++++++++++
.../queue/rabbitmq/RabbitMQWaitStrategy.java | 67 ++++++++++++++++++
5 files changed, 279 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 950089f..85d888d 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -33,5 +33,38 @@
<name>Apache James :: Server :: Mail Queue :: RabbitMQ</name>
<dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-launcher</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
new file mode 100644
index 0000000..0eab7a2
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.testcontainers.containers.GenericContainer;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+public class DockerRabbitMQ {
+
+ private static final int DEFAULT_RABBITMQ_PORT = 5672;
+ private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
+ private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
+ private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
+
+ private GenericContainer<?> container;
+
+ @SuppressWarnings("resource")
+ public DockerRabbitMQ() {
+ container = new GenericContainer<>("rabbitmq:3.7.5")
+ .withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
+ .withExposedPorts(DEFAULT_RABBITMQ_PORT)
+ .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this));
+ }
+
+ public String getHostIp() {
+ return container.getContainerIpAddress();
+ }
+
+ public Integer getPort() {
+ return container.getMappedPort(DEFAULT_RABBITMQ_PORT);
+ }
+
+ public String getUsername() {
+ return DEFAULT_RABBITMQ_USERNAME;
+ }
+
+ public String getPassword() {
+ return DEFAULT_RABBITMQ_PASSWORD;
+ }
+
+ public ConnectionFactory connectionFactory() {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setHost(getHostIp());
+ connectionFactory.setPort(getPort());
+ connectionFactory.setUsername(getUsername());
+ connectionFactory.setPassword(getPassword());
+ return connectionFactory;
+ }
+
+ public void start() {
+ container.start();
+ }
+
+ public void stop() {
+ container.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
new file mode 100644
index 0000000..925ff08
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
@@ -0,0 +1,55 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+ private DockerRabbitMQ rabbitMQ;
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ rabbitMQ = new DockerRabbitMQ();
+ rabbitMQ.start();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) {
+ rabbitMQ.stop();
+ }
+
+ @Override
+ public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
+ }
+
+ @Override
+ public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+ return rabbitMQ;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
new file mode 100644
index 0000000..f2a0cea
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+public class DockerRabbitMQExtensionTest {
+
+ private ConnectionFactory connectionFactory;
+
+ @BeforeEach
+ public void setup(DockerRabbitMQ rabbitMQ) {
+ connectionFactory = new ConnectionFactory();
+ connectionFactory.setHost(rabbitMQ.getHostIp());
+ connectionFactory.setPort(rabbitMQ.getPort());
+ connectionFactory.setUsername(rabbitMQ.getUsername());
+ connectionFactory.setPassword(rabbitMQ.getPassword());
+ }
+
+ @Test
+ public void containerShouldBeUp() throws Exception {
+ try (Connection connection = connectionFactory.newConnection()) {
+ assertThat(connection.isOpen()).isTrue();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
new file mode 100644
index 0000000..ba6ce3b
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
+
+import com.google.common.primitives.Ints;
+import com.rabbitmq.client.Connection;
+
+public class RabbitMQWaitStrategy implements WaitStrategy {
+
+ private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
+
+ public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) {
+ return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT);
+ }
+
+ private final DockerRabbitMQ rabbitMQ;
+ private final Duration timeout;
+
+ public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) {
+ this.rabbitMQ = rabbitMQ;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
+ int seconds = Ints.checkedCast(this.timeout.getSeconds());
+
+ Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected);
+ }
+
+ private Boolean isConnected() throws IOException, TimeoutException {
+ try (Connection connection = rabbitMQ.connectionFactory().newConnection()) {
+ return connection.isOpen();
+ }
+ }
+
+ @Override
+ public WaitStrategy withStartupTimeout(Duration startupTimeout) {
+ return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[08/17] james-project git commit: JAMES-2334 Demonstrate that
published messages are not lost without consumer
Posted by ma...@apache.org.
JAMES-2334 Demonstrate that published messages are not lost without consumer
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/30679c47
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/30679c47
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/30679c47
Branch: refs/heads/master
Commit: 30679c47140f93d402bace10caaa7221c94690ef
Parents: 8daf895
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:44:03 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
server/queue/queue-rabbitmq/pom.xml | 4 ++
.../james/queue/rabbitmq/RabbitMQFixture.java | 46 ++++++++++++
.../james/queue/rabbitmq/RabbitMQTest.java | 75 ++++++++++++++++++++
3 files changed, 125 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 85d888d..27b0c84 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -38,6 +38,10 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>com.jayway.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
new file mode 100644
index 0000000..e216690
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static com.jayway.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static com.jayway.awaitility.Duration.ONE_MINUTE;
+
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
+import com.jayway.awaitility.core.ConditionFactory;
+import com.rabbitmq.client.AMQP;
+
+public class RabbitMQFixture {
+ public static final boolean DURABLE = true;
+ public static final boolean AUTO_ACK = true;
+ public static final AMQP.BasicProperties NO_PROPERTIES = null;
+ public static final String EXCHANGE_NAME = "exchangeName";
+ public static final String ROUTING_KEY = "routingKey";
+ public static final String DIRECT = "direct";
+
+ public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS;
+ public static ConditionFactory calmlyAwait = Awaitility.with()
+ .pollInterval(slowPacedPollInterval)
+ .and()
+ .with()
+ .pollDelay(slowPacedPollInterval)
+ .await();
+ public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
new file mode 100644
index 0000000..465be65
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+class RabbitMQTest {
+
+ private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
+
+ @Test
+ void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
+ ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+ try (Connection connection = connectionFactory.newConnection();
+ Channel channel = connection.createChannel()) {
+ String queueName = createQueue(channel);
+
+ publishAMessage(channel);
+
+ awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+ }
+ }
+
+ private String createQueue(Channel channel) throws IOException {
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ String queueName = channel.queueDeclare().getQueue();
+ channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
+ return queueName;
+ }
+
+ private void publishAMessage(Channel channel) throws IOException {
+ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+ }
+
+ private Boolean messageReceived(Channel channel, String queueName) {
+ try {
+ return channel.basicGet(queueName, !AUTO_ACK) != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[15/17] james-project git commit: JAMES-2334 Test cross cluster
operations
Posted by ma...@apache.org.
JAMES-2334 Test cross cluster operations
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/00adfa82
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/00adfa82
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/00adfa82
Branch: refs/heads/master
Commit: 00adfa82472cf1ed8e4cc04a5d386c397cda9801
Parents: 749e641
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:30:14 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200
----------------------------------------------------------------------
.../queue/rabbitmq/RabbitMQClusterTest.java | 121 ++++++++++---------
1 file changed, 66 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/00adfa82/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index f88ecf4..4a7dd07 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -23,17 +23,21 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -46,15 +50,36 @@ import com.rabbitmq.client.ConnectionFactory;
@ExtendWith(DockerClusterRabbitMQExtention.class)
class RabbitMQClusterTest {
- public static final String QUEUE = "queue";
+ private static final String QUEUE = "queue";
+
+ private Connection node1Connection;
+ private Channel node1Channel;
+ private Connection node2Connection;
+ private Channel node2Channel;
+
+ @BeforeEach
+ void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+ ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+ ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+ node1Connection = node1ConnectionFactory.newConnection();
+ node2Connection = node2ConnectionFactory.newConnection();
+ node1Channel = node1Connection.createChannel();
+ node2Channel = node2Connection.createChannel();
+ }
@AfterEach
- public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
- cluster.getRabbitMQ2()
- .connectionFactory()
- .newConnection()
- .createChannel()
- .queueDelete(QUEUE);
+ void tearDown() {
+ closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignoring exception
+ }
+ }
}
@Test
@@ -71,64 +96,50 @@ class RabbitMQClusterTest {
}
@Test
- public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
- ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+ void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- try (Connection connection = connectionFactory1.newConnection();
- Channel channel = connection.createChannel()) {
-
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
- channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
-
- MESSAGES_AS_BYTES.forEach(Throwing.consumer(
- bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- }
- try (Connection connection2 = connectionFactory2.newConnection();
- Channel channel2 = connection2.createChannel()) {
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
- InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
- channel2.basicConsume(QUEUE, consumer2);
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
-
- assertThat(consumer2.getConsumedMessages())
- .containsOnlyElementsOf(MESSAGES);
- }
-
- assertThat(result)
- .containsOnlyElementsOf(MESSAGES);
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
}
@Test
- public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
- ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
- ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+ void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+ node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+ node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+ node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
- try (Connection connection = connectionFactory1.newConnection();
- Channel channel = connection.createChannel();
- Connection connection2 = connectionFactory2.newConnection();
- Channel channel2 = connection2.createChannel()) {
+ int nbMessages = 10;
+ IntStream.range(0, nbMessages)
+ .mapToObj(i -> asBytes(String.valueOf(i)))
+ .forEach(Throwing.consumer(
+ bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
- channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
- channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
- channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+ InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+ node2Channel.basicConsume(QUEUE, consumer2);
- MESSAGES_AS_BYTES.forEach(Throwing.consumer(
- bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
- InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
- channel2.basicConsume(QUEUE, consumer2);
-
- awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+ List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+ assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+ }
- assertThat(consumer2.getConsumedMessages())
- .containsOnlyElementsOf(MESSAGES);
- }
+ private byte[] asBytes(String message) {
+ return message.getBytes(StandardCharsets.UTF_8);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org