You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2018/05/31 07:47:40 UTC

[01/17] james-project git commit: JAMES-2334 checkstyle fixes

Repository: james-project
Updated Branches:
  refs/heads/master 10dea04b2 -> 0fdb59c41


JAMES-2334 checkstyle fixes


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0fdb59c4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0fdb59c4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0fdb59c4

Branch: refs/heads/master
Commit: 0fdb59c41cb458bea44cf27c05d9b32e80acf3fc
Parents: c7d08bb
Author: Matthieu Baechler <ma...@apache.org>
Authored: Tue May 29 14:37:26 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/mailbox/model/QuotaRatioTest.java       | 18 +++++++++---------
 .../elasticsearch/json/QuotaRatioAsJsonTest.java  |  4 ++--
 .../MemoryQuotaSearchTestSystemExtension.java     |  2 +-
 .../james/queue/rabbitmq/InMemoryConsumer.java    |  2 +-
 4 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
----------------------------------------------------------------------
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
index 182caf6..3940068 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/model/QuotaRatioTest.java
@@ -27,50 +27,50 @@ import org.junit.jupiter.api.Test;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
 
-public class QuotaRatioTest {
+class QuotaRatioTest {
 
-    private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize> builder()
+    private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize>builder()
             .used(QuotaSize.size(15))
             .computedLimit(QuotaSize.size(60))
             .build();
-    private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount> builder()
+    private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount>builder()
             .used(QuotaCount.count(1))
             .computedLimit(QuotaCount.count(2))
             .build();
 
     @Test
-    public void shouldMatchBeanContact() {
+    void shouldMatchBeanContact() {
         EqualsVerifier.forClass(QuotaRatio.class)
             .allFieldsShouldBeUsed()
             .verify();
     }
 
     @Test
-    public void quotaRatioShouldThrowWhenQuotaSizeIsNull() {
+    void quotaRatioShouldThrowWhenQuotaSizeIsNull() {
         assertThatThrownBy(() -> QuotaRatio.from(null, QUOTA_COUNT))
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    public void quotaRatioShouldThrowWhenQuotaCountIsNull() {
+    void quotaRatioShouldThrowWhenQuotaCountIsNull() {
         assertThatThrownBy(() -> QuotaRatio.from(QUOTA_SIZE, null))
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    public void quotaSizeShouldReturnTheQuotaSize() {
+    void quotaSizeShouldReturnTheQuotaSize() {
         QuotaRatio quotaRatio = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT);
         assertThat(quotaRatio.getQuotaSize()).isEqualTo(QUOTA_SIZE);
     }
 
     @Test
-    public void quotaCountShouldReturnTheQuotaCount() {
+    void quotaCountShouldReturnTheQuotaCount() {
         QuotaRatio quotaRatio = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT);
         assertThat(quotaRatio.getQuotaCount()).isEqualTo(QUOTA_COUNT);
     }
 
     @Test
-    public void maxShouldReturnTheMaxRatio() {
+    void maxShouldReturnTheMaxRatio() {
         double max = QuotaRatio.from(QUOTA_SIZE, QUOTA_COUNT)
                 .max();
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
index fb16937..e5046ba 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/json/QuotaRatioAsJsonTest.java
@@ -33,11 +33,11 @@ import nl.jqno.equalsverifier.EqualsVerifier;
 
 public class QuotaRatioAsJsonTest {
 
-    private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize> builder()
+    private static final Quota<QuotaSize> QUOTA_SIZE = Quota.<QuotaSize>builder()
             .used(QuotaSize.size(15))
             .computedLimit(QuotaSize.size(60))
             .build();
-    private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount> builder()
+    private static final Quota<QuotaCount> QUOTA_COUNT = Quota.<QuotaCount>builder()
             .used(QuotaCount.count(1))
             .computedLimit(QuotaCount.count(2))
             .build();

http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
index e6c8f05..12394e7 100644
--- a/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-scanning/src/test/java/org/apache/james/quota/search/scanning/MemoryQuotaSearchTestSystemExtension.java
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.extension.ParameterResolver;
 
 public class MemoryQuotaSearchTestSystemExtension implements ParameterResolver {
 
-    private static final Runnable NO_AWAIT = () -> {};
+    private static final Runnable NO_AWAIT = () -> { };
 
     @Override
     public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/0fdb59c4/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index 6dd29af..211a9ae 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -39,7 +39,7 @@ public class InMemoryConsumer extends DefaultConsumer {
     private final Operation operation;
 
     public InMemoryConsumer(Channel channel) {
-        this(channel, () -> {});
+        this(channel, () -> { });
     }
 
     public InMemoryConsumer(Channel channel, Operation operation) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[10/17] james-project git commit: JAMES-2334 Demonstrate that workQueue is working

Posted by ma...@apache.org.
JAMES-2334 Demonstrate that workQueue is working


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b07fd7e0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b07fd7e0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b07fd7e0

Branch: refs/heads/master
Commit: b07fd7e0abdc60a37f41f26e8daa152d49adf596
Parents: 9f78f0d
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 10:28:01 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/InMemoryConsumer.java  |  49 ++++++
 .../james/queue/rabbitmq/RabbitMQFixture.java   |   3 +
 .../james/queue/rabbitmq/RabbitMQTest.java      | 154 +++++++++++++------
 3 files changed, 162 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
new file mode 100644
index 0000000..f5f8fa2
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+
+public class InMemoryConsumer extends DefaultConsumer {
+
+    private final ConcurrentLinkedQueue<Integer> messages;
+
+    public InMemoryConsumer(Channel channel) {
+        super(channel);
+        messages = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+        Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
+        messages.add(payload);
+    }
+
+    public Queue<Integer> getConsumedMessages() {
+        return messages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
index e216690..3ed6237 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
@@ -34,6 +34,9 @@ public class RabbitMQFixture {
     public static final String EXCHANGE_NAME = "exchangeName";
     public static final String ROUTING_KEY = "routingKey";
     public static final String DIRECT = "direct";
+    public static final boolean EXCLUSIVE = true;
+    public static final boolean AUTO_DELETE = true;
+    public static final String WORK_QUEUE = "workQueue";
 
     public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS;
     public static ConditionFactory calmlyAwait = Awaitility.with()

http://git-wip-us.apache.org/repos/asf/james-project/blob/b07fd7e0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index cad0303..2463516 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -19,34 +19,38 @@
 package org.apache.james.queue.rabbitmq;
 
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.WORK_QUEUE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.DefaultConsumer;
-import com.rabbitmq.client.Envelope;
 
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQTest {
@@ -87,44 +91,65 @@ class RabbitMQTest {
                 return false;
             }
         }
-
     }
 
     @Nested
-    class BroadcastTest {
+    class FourConnections {
 
         private ConnectionFactory connectionFactory1;
         private ConnectionFactory connectionFactory2;
         private ConnectionFactory connectionFactory3;
         private ConnectionFactory connectionFactory4;
+        private Connection connection1;
+        private Connection connection2;
+        private Connection connection3;
+        private Connection connection4;
+        private Channel publisherChannel;
+        private Channel subscriberChannel2;
+        private Channel subscriberChannel3;
+        private Channel subscriberChannel4;
 
         @BeforeEach
-        public void setup(DockerRabbitMQ rabbitMQ) {
+        void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
             connectionFactory1 = rabbitMQ.connectionFactory();
             connectionFactory2 = rabbitMQ.connectionFactory();
             connectionFactory3 = rabbitMQ.connectionFactory();
             connectionFactory4 = rabbitMQ.connectionFactory();
+            connection1 = connectionFactory1.newConnection();
+            connection2 = connectionFactory2.newConnection();
+            connection3 = connectionFactory3.newConnection();
+            connection4 = connectionFactory4.newConnection();
+            publisherChannel = connection1.createChannel();
+            subscriberChannel2 = connection2.createChannel();
+            subscriberChannel3 = connection3.createChannel();
+            subscriberChannel4 = connection4.createChannel();
         }
 
-        // In the following case, each consumer will receive the messages produced by the
-        // producer
-        // To do so, each consumer will bind it's queue to the producer exchange.
-        @Test
-        public void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
-            ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
-            ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>();
-            ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>();
-            ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>();
-
-            try (Connection connection1 = connectionFactory1.newConnection();
-                 Channel publisherChannel = connection1.createChannel();
-                 Connection connection2 = connectionFactory2.newConnection();
-                 Channel subscriberChannel2 = connection2.createChannel();
-                 Connection connection3 = connectionFactory3.newConnection();
-                 Channel subscriberChannel3 = connection3.createChannel();
-                 Connection connection4 = connectionFactory4.newConnection();
-                 Channel subscriberChannel4 = connection4.createChannel()) {
+        @AfterEach
+        void tearDown() {
+            closeQuietly(
+                publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4,
+                connection1, connection2, connection3, connection4);
+        }
+
+        private void closeQuietly(AutoCloseable... closeables) {
+            for (AutoCloseable closeable : closeables) {
+                try {
+                    closeable.close();
+                } catch (Exception e) {
+                    //ignoring exception
+                }
+            }
+        }
+
+        @Nested
+        class BroadCast {
 
+            // In the following case, each consumer will receive the messages produced by the
+            // producer
+            // To do so, each consumer will bind it's queue to the producer exchange.
+            @Test
+            void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
                 // Declare a single exchange and three queues attached to it.
                 publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
 
@@ -135,9 +160,12 @@ class RabbitMQTest {
                 String queue4 = subscriberChannel4.queueDeclare().getQueue();
                 subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
 
-                subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2));
-                subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3));
-                subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4));
+                InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
+                subscriberChannel2.basicConsume(queue2, consumer2);
+                subscriberChannel3.basicConsume(queue3, consumer3);
+                subscriberChannel4.basicConsume(queue4, consumer4);
 
                 // the publisher will produce 10 messages
                 IntStream.range(0, 10)
@@ -146,30 +174,68 @@ class RabbitMQTest {
                     .forEach(Throwing.consumer(
                         bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-                awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4));
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
 
+                ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
                 // Check every subscriber have receive all the messages.
-                assertThat(results2).containsOnlyElementsOf(expectedResult);
-                assertThat(results3).containsOnlyElementsOf(expectedResult);
-                assertThat(results4).containsOnlyElementsOf(expectedResult);
+                assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                assertThat(consumer3.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+                assertThat(consumer4.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
             }
         }
 
-        private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) {
-            return Iterables.size(
-                Iterables.concat(results2, results3, results4))
-                == expectedResult.size() * 3;
+        @Nested
+        class WorkQueue {
+
+            // In the following case, consumers will receive the messages produced by the
+            // producer but will share them.
+            // To do so, we will bind a single queue to the producer exchange.
+            @Test
+            void rabbitMQShouldSupportTheWorkQueueCase() throws Exception {
+                int nbMessages = 100;
+
+                // Declare the exchange and a single queue attached to it.
+                publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+                publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                // Publisher will produce 100 messages
+                IntStream.range(0, nbMessages)
+                    .mapToObj(String::valueOf)
+                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .forEach(Throwing.consumer(
+                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
+                subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
+                subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
+                subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+
+                awaitAtMostOneMinute.until(
+                    () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages);
+
+                ImmutableList<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+
+                assertThat(
+                    Iterables.concat(
+                        consumer2.getConsumedMessages(),
+                        consumer3.getConsumedMessages(),
+                        consumer4.getConsumedMessages()))
+                    .containsOnlyElementsOf(expectedResult);
+            }
+
         }
 
-        private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) {
-            return new DefaultConsumer(channel) {
-                @Override
-                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-                    Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
-                    results.add(payload);
-                }
-            };
+        private long countReceivedMessages(InMemoryConsumer... consumers) {
+            return Arrays.stream(consumers)
+                .map(InMemoryConsumer::getConsumedMessages)
+                .mapToLong(Queue::size)
+                .sum();
         }
+
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[13/17] james-project git commit: JAMES-2334 Test cross cluster operations

Posted by ma...@apache.org.
JAMES-2334 Test cross cluster operations

 - Pub/sub on different nodes
 - Exchange/queue binding on different nodes


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/749e6413
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/749e6413
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/749e6413

Branch: refs/heads/master
Commit: 749e64138816e220b96134389ddd5a03cac188d5
Parents: 4439551
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:20:17 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 91 ++++++++++++++++++++
 1 file changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/749e6413/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 8874641..f88ecf4 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -18,15 +18,45 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_DELETE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.concurrent.ConcurrentLinkedDeque;
+
 import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 
 @ExtendWith(DockerClusterRabbitMQExtention.class)
 class RabbitMQClusterTest {
 
+    public static final String QUEUE = "queue";
+
+    @AfterEach
+    public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
+        cluster.getRabbitMQ2()
+            .connectionFactory()
+            .newConnection()
+            .createChannel()
+            .queueDelete(QUEUE);
+    }
+
     @Test
     void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
         String stdout = cluster.getRabbitMQ1().container()
@@ -40,4 +70,65 @@ class RabbitMQClusterTest {
                 DockerClusterRabbitMQExtention.RABBIT_3);
     }
 
+    @Test
+    public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+        ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+
+        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+
+        try (Connection connection = connectionFactory1.newConnection();
+             Channel channel = connection.createChannel()) {
+
+            channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+            channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+        }
+
+        try (Connection connection2 = connectionFactory2.newConnection();
+             Channel channel2 = connection2.createChannel()) {
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+            channel2.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+            assertThat(consumer2.getConsumedMessages())
+                .containsOnlyElementsOf(MESSAGES);
+        }
+
+        assertThat(result)
+            .containsOnlyElementsOf(MESSAGES);
+    }
+
+    @Test
+    public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+
+        try (Connection connection = connectionFactory1.newConnection();
+             Channel channel = connection.createChannel();
+             Connection connection2 = connectionFactory2.newConnection();
+             Channel channel2 = connection2.createChannel()) {
+
+            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
+                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+            channel2.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+
+            assertThat(consumer2.getConsumedMessages())
+                .containsOnlyElementsOf(MESSAGES);
+        }
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[16/17] james-project git commit: JAMES-2334 Improve rabbit mq image logging

Posted by ma...@apache.org.
JAMES-2334 Improve rabbit mq image logging


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c7d08bb7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c7d08bb7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c7d08bb7

Branch: refs/heads/master
Commit: c7d08bb72e0ca8cc214ae5f66841aedad069f511
Parents: 21eec77
Author: benwa <bt...@linagora.com>
Authored: Tue May 29 10:13:44 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/DockerRabbitMQ.java       | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c7d08bb7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index d890c2b..02f89b2 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -112,19 +112,28 @@ public class DockerRabbitMQ {
     }
 
     public void join(DockerRabbitMQ rabbitMQ) throws Exception {
-        container()
+        stopApp();
+        joinCluster(rabbitMQ);
+    }
+
+    private void stopApp() throws java.io.IOException, InterruptedException {
+        String stdout = container()
             .execInContainer("rabbitmqctl", "stop_app")
             .getStdout();
-        container()
+        LOGGER.debug("stop_app: {}", stdout);
+    }
+
+    private void joinCluster(DockerRabbitMQ rabbitMQ) throws java.io.IOException, InterruptedException {
+        String stdout = container()
             .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
             .getStdout();
+        LOGGER.debug("join_cluster: {}", stdout);
     }
 
     public void startApp() throws Exception {
         String stdout = container()
                 .execInContainer("rabbitmqctl", "start_app")
                 .getStdout();
-        System.out.println(stdout);
-
+        LOGGER.debug("start_app: {}", stdout);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[11/17] james-project git commit: JAMES-2334 Demonstrate that routing is working

Posted by ma...@apache.org.
JAMES-2334 Demonstrate that routing is working


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/94ddf6a0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/94ddf6a0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/94ddf6a0

Branch: refs/heads/master
Commit: 94ddf6a0817b5adcf32666075cc1514ebd96e67e
Parents: b07fd7e
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 14:55:06 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/RabbitMQTest.java      | 136 +++++++++++++------
 1 file changed, 96 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/94ddf6a0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 2463516..5e8f311 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -55,8 +55,6 @@ import com.rabbitmq.client.ConnectionFactory;
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQTest {
 
-    private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
-
     @Nested
     class SingleConsumerTest {
 
@@ -81,7 +79,7 @@ class RabbitMQTest {
         }
 
         private void publishAMessage(Channel channel) throws IOException {
-            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, asBytes("Hello, world!"));
         }
 
         private Boolean messageReceived(Channel channel, String queueName) {
@@ -104,10 +102,10 @@ class RabbitMQTest {
         private Connection connection2;
         private Connection connection3;
         private Connection connection4;
-        private Channel publisherChannel;
-        private Channel subscriberChannel2;
-        private Channel subscriberChannel3;
-        private Channel subscriberChannel4;
+        private Channel channel1;
+        private Channel channel2;
+        private Channel channel3;
+        private Channel channel4;
 
         @BeforeEach
         void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
@@ -119,16 +117,16 @@ class RabbitMQTest {
             connection2 = connectionFactory2.newConnection();
             connection3 = connectionFactory3.newConnection();
             connection4 = connectionFactory4.newConnection();
-            publisherChannel = connection1.createChannel();
-            subscriberChannel2 = connection2.createChannel();
-            subscriberChannel3 = connection3.createChannel();
-            subscriberChannel4 = connection4.createChannel();
+            channel1 = connection1.createChannel();
+            channel2 = connection2.createChannel();
+            channel3 = connection3.createChannel();
+            channel4 = connection4.createChannel();
         }
 
         @AfterEach
         void tearDown() {
             closeQuietly(
-                publisherChannel, subscriberChannel2, subscriberChannel3, subscriberChannel4,
+                channel1, channel2, channel3, channel4,
                 connection1, connection2, connection3, connection4);
         }
 
@@ -151,28 +149,28 @@ class RabbitMQTest {
             @Test
             void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
                 // Declare a single exchange and three queues attached to it.
-                publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-
-                String queue2 = subscriberChannel2.queueDeclare().getQueue();
-                subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
-                String queue3 = subscriberChannel3.queueDeclare().getQueue();
-                subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
-                String queue4 = subscriberChannel4.queueDeclare().getQueue();
-                subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
-
-                InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
-                InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
-                InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
-                subscriberChannel2.basicConsume(queue2, consumer2);
-                subscriberChannel3.basicConsume(queue3, consumer3);
-                subscriberChannel4.basicConsume(queue4, consumer4);
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                channel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+                String queue3 = channel3.queueDeclare().getQueue();
+                channel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+                String queue4 = channel4.queueDeclare().getQueue();
+                channel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
 
                 // the publisher will produce 10 messages
                 IntStream.range(0, 10)
                     .mapToObj(String::valueOf)
-                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .map(RabbitMQTest.this::asBytes)
                     .forEach(Throwing.consumer(
-                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
@@ -196,23 +194,23 @@ class RabbitMQTest {
                 int nbMessages = 100;
 
                 // Declare the exchange and a single queue attached to it.
-                publisherChannel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
-                publisherChannel.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
-                publisherChannel.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, AUTO_DELETE, ImmutableMap.of());
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
                 // Publisher will produce 100 messages
                 IntStream.range(0, nbMessages)
                     .mapToObj(String::valueOf)
-                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .map(RabbitMQTest.this::asBytes)
                     .forEach(Throwing.consumer(
-                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-                InMemoryConsumer consumer2 = new InMemoryConsumer(subscriberChannel2);
-                InMemoryConsumer consumer3 = new InMemoryConsumer(subscriberChannel3);
-                InMemoryConsumer consumer4 = new InMemoryConsumer(subscriberChannel4);
-                subscriberChannel2.basicConsume(WORK_QUEUE, consumer2);
-                subscriberChannel3.basicConsume(WORK_QUEUE, consumer3);
-                subscriberChannel4.basicConsume(WORK_QUEUE, consumer4);
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel2.basicConsume(WORK_QUEUE, consumer2);
+                channel3.basicConsume(WORK_QUEUE, consumer3);
+                channel4.basicConsume(WORK_QUEUE, consumer4);
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, consumer4) == nbMessages);
@@ -229,6 +227,61 @@ class RabbitMQTest {
 
         }
 
+        @Nested
+        class Routing {
+            @Test
+            void rabbitMQShouldSupportRouting() throws Exception {
+                String conversation1 = "c1";
+                String conversation2 = "c2";
+                String conversation3 = "c3";
+                String conversation4 = "c4";
+
+                // Declare the exchange and a single queue attached to it.
+                channel1.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue1 = channel1.queueDeclare().getQueue();
+                // 1 will follow conversation 1 and 2
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation1);
+                channel1.queueBind(queue1, EXCHANGE_NAME, conversation2);
+
+                String queue2 = channel2.queueDeclare().getQueue();
+                // 2 will follow conversation 2 and 3
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation2);
+                channel2.queueBind(queue2, EXCHANGE_NAME, conversation3);
+
+                String queue3 = channel3.queueDeclare().getQueue();
+                // 3 will follow conversation 3 and 4
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation3);
+                channel3.queueBind(queue3, EXCHANGE_NAME, conversation4);
+
+                String queue4 = channel4.queueDeclare().getQueue();
+                // 4 will follow conversation 1 and 4
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation1);
+                channel4.queueBind(queue4, EXCHANGE_NAME, conversation4);
+
+                channel1.basicPublish(EXCHANGE_NAME, conversation1, NO_PROPERTIES, asBytes("1"));
+                channel2.basicPublish(EXCHANGE_NAME, conversation2, NO_PROPERTIES, asBytes("2"));
+                channel3.basicPublish(EXCHANGE_NAME, conversation3, NO_PROPERTIES, asBytes("3"));
+                channel4.basicPublish(EXCHANGE_NAME, conversation4, NO_PROPERTIES, asBytes("4"));
+
+                InMemoryConsumer consumer1 = new InMemoryConsumer(channel1);
+                InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
+                InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
+                InMemoryConsumer consumer4 = new InMemoryConsumer(channel4);
+                channel1.basicConsume(queue1, consumer1);
+                channel2.basicConsume(queue2, consumer2);
+                channel3.basicConsume(queue3, consumer3);
+                channel4.basicConsume(queue4, consumer4);
+
+                awaitAtMostOneMinute.until(() -> countReceivedMessages(consumer1, consumer2, consumer3, consumer4) == 8);
+
+                assertThat(consumer1.getConsumedMessages()).containsOnly(1, 2);
+                assertThat(consumer2.getConsumedMessages()).containsOnly(2, 3);
+                assertThat(consumer3.getConsumedMessages()).containsOnly(3, 4);
+                assertThat(consumer4.getConsumedMessages()).containsOnly(1, 4);
+            }
+        }
+
         private long countReceivedMessages(InMemoryConsumer... consumers) {
             return Arrays.stream(consumers)
                 .map(InMemoryConsumer::getConsumedMessages)
@@ -238,5 +291,8 @@ class RabbitMQTest {
 
     }
 
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[03/17] james-project git commit: JAMES-2334 Introduce a dockerized rabbitMQ cluster

Posted by ma...@apache.org.
JAMES-2334 Introduce a dockerized rabbitMQ cluster


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/44395515
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/44395515
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/44395515

Branch: refs/heads/master
Commit: 44395515f58cfa7442927f209fd015ed7b6c723b
Parents: 8767d00
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:18:50 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |   5 +
 .../DockerClusterRabbitMQExtention.java         | 112 +++++++++++++++++++
 .../james/queue/rabbitmq/DockerRabbitMQ.java    |  53 ++++++++-
 .../queue/rabbitmq/DockerRabbitMQExtension.java |   4 +-
 .../queue/rabbitmq/RabbitMQClusterTest.java     |  43 +++++++
 5 files changed, 210 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index d7d8ad7..86bbfa2 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -54,6 +54,11 @@
             <artifactId>amqp-client</artifactId>
         </dependency>
         <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
new file mode 100644
index 0000000..fae2016
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
@@ -0,0 +1,112 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.google.common.collect.ImmutableList;
+
+public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+    public static final String RABBIT_1 = "rabbit1";
+    public static final String RABBIT_2 = "rabbit2";
+    public static final String RABBIT_3 = "rabbit3";
+    private DockerRabbitMQCluster cluster;
+    private Network network;
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        String cookie = DigestUtils.sha1Hex("secret cookie here");
+
+        network = Network.NetworkImpl.builder()
+            .enableIpv6(false)
+            .createNetworkCmdModifiers(ImmutableList.of())
+            .build();
+
+        DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
+        DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
+        DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
+
+        rabbitMQ1.start();
+        rabbitMQ2.start();
+        rabbitMQ3.start();
+
+        rabbitMQ2.join(rabbitMQ1);
+        rabbitMQ3.join(rabbitMQ1);
+
+        rabbitMQ2.startApp();
+        rabbitMQ3.startApp();
+
+        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        cluster.stop();
+        network.close();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return cluster;
+    }
+
+    public static class DockerRabbitMQCluster {
+
+        private final DockerRabbitMQ rabbitMQ1;
+        private final DockerRabbitMQ rabbitMQ2;
+        private final DockerRabbitMQ rabbitMQ3;
+
+        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+            this.rabbitMQ1 = rabbitMQ1;
+            this.rabbitMQ2 = rabbitMQ2;
+            this.rabbitMQ3 = rabbitMQ3;
+        }
+
+        public void stop() {
+            rabbitMQ1.stop();
+            rabbitMQ2.stop();
+            rabbitMQ3.stop();
+        }
+
+        public DockerRabbitMQ getRabbitMQ1() {
+            return rabbitMQ1;
+        }
+
+        public DockerRabbitMQ getRabbitMQ2() {
+            return rabbitMQ2;
+        }
+
+        public DockerRabbitMQ getRabbitMQ3() {
+            return rabbitMQ3;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 8fea5b0..a964b99 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,30 +18,50 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
+import java.util.Optional;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
 
 import com.rabbitmq.client.ConnectionFactory;
 
 public class DockerRabbitMQ {
     private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
 
+    private static final String DEFAULT_RABBIT_NODE = "my-rabbit";
     private static final int DEFAULT_RABBITMQ_PORT = 5672;
-    private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
     private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
     private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
+    private static final String RABBITMQ_ERLANG_COOKIE = "RABBITMQ_ERLANG_COOKIE";
+    private static final String RABBITMQ_NODENAME = "RABBITMQ_NODENAME";
+
+    private final GenericContainer<?> container;
+    private final Optional<String> nodeName;
 
-    private GenericContainer<?> container;
+    public static DockerRabbitMQ withCookieAndNodeName(String hostName, String erlangCookie, String nodeName, Network network) {
+        return new DockerRabbitMQ(Optional.ofNullable(hostName), Optional.ofNullable(erlangCookie), Optional.ofNullable(nodeName),
+            Optional.of(network));
+    }
+
+    public static DockerRabbitMQ withoutCookie() {
+        return new DockerRabbitMQ(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+    }
 
     @SuppressWarnings("resource")
-    public DockerRabbitMQ() {
+    private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
         container = new GenericContainer<>("rabbitmq:3.7.5")
-                .withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
+                .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse("localhost")))
+                .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
                 .withExposedPorts(DEFAULT_RABBITMQ_PORT)
                 .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this))
                 .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()));
+        net.ifPresent(container::withNetwork);
+        erlangCookie.ifPresent(cookie -> container.withEnv(RABBITMQ_ERLANG_COOKIE, cookie));
+        nodeName.ifPresent(name -> container.withEnv(RABBITMQ_NODENAME, name));
+        this.nodeName = nodeName;
     }
 
     public String getHostIp() {
@@ -81,4 +101,29 @@ public class DockerRabbitMQ {
         DockerClientFactory.instance().client()
             .restartContainerCmd(container.getContainerId());
     }
+
+    public GenericContainer<?> container() {
+        return container;
+    }
+
+    public String node() {
+        return nodeName.get();
+    }
+
+    public void join(DockerRabbitMQ rabbitMQ) throws Exception {
+        container()
+            .execInContainer("rabbitmqctl", "stop_app")
+            .getStdout();
+        container()
+            .execInContainer("rabbitmqctl", "join_cluster", rabbitMQ.node())
+            .getStdout();
+    }
+
+    public void startApp() throws Exception {
+        String stdout = container()
+                .execInContainer("rabbitmqctl", "start_app")
+                .getStdout();
+        System.out.println(stdout);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
index 925ff08..b0cbfd7 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
@@ -18,9 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
-import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.ParameterContext;
@@ -33,7 +31,7 @@ public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCal
 
     @Override
     public void beforeEach(ExtensionContext context) {
-        rabbitMQ = new DockerRabbitMQ();
+        rabbitMQ = DockerRabbitMQ.withoutCookie();
         rabbitMQ.start();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/44395515/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
new file mode 100644
index 0000000..8874641
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(DockerClusterRabbitMQExtention.class)
+class RabbitMQClusterTest {
+
+    @Test
+    void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+        String stdout = cluster.getRabbitMQ1().container()
+            .execInContainer("rabbitmqctl", "cluster_status")
+            .getStdout();
+
+        assertThat(stdout)
+            .contains(
+                DockerClusterRabbitMQExtention.RABBIT_1,
+                DockerClusterRabbitMQExtention.RABBIT_2,
+                DockerClusterRabbitMQExtention.RABBIT_3);
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[12/17] james-project git commit: JAMES-2334 Demonstrate that durability is working

Posted by ma...@apache.org.
JAMES-2334 Demonstrate that durability is working


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/fb67d5f2
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/fb67d5f2
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/fb67d5f2

Branch: refs/heads/master
Commit: fb67d5f2cebda481b2ff659d90a764e64133bc28
Parents: 94ddf6a
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:00:10 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../james/queue/rabbitmq/DockerRabbitMQ.java    |  6 ++
 .../james/queue/rabbitmq/RabbitMQTest.java      | 66 +++++++++++++++-----
 2 files changed, 55 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/fb67d5f2/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 0eab7a2..8c7bae8 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
+import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
 
 import com.rabbitmq.client.ConnectionFactory;
@@ -71,4 +72,9 @@ public class DockerRabbitMQ {
     public void stop() {
         container.stop();
     }
+
+    public void restart() {
+        DockerClientFactory.instance().client()
+            .restartContainerCmd(container.getContainerId());
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fb67d5f2/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 5e8f311..b81035e 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -58,16 +58,46 @@ class RabbitMQTest {
     @Nested
     class SingleConsumerTest {
 
+        private ConnectionFactory connectionFactory;
+        private Connection connection;
+        private Channel channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException {
+            connectionFactory = rabbitMQ.connectionFactory();
+            connection = connectionFactory.newConnection();
+            channel = connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(connection, channel);
+        }
+
+        @Test
+        void publishedEventWithoutSubscriberShouldNotBeLost() throws Exception {
+            String queueName = createQueue(channel);
+            publishAMessage(channel);
+            awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+        }
+
         @Test
-        void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
-            ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
-            try (Connection connection = connectionFactory.newConnection();
-                 Channel channel = connection.createChannel()) {
-                String queueName = createQueue(channel);
+        void demonstrateDurability(DockerRabbitMQ rabbitMQ) throws Exception {
+            String queueName = createQueue(channel);
+            publishAMessage(channel);
 
-                publishAMessage(channel);
+            rabbitMQ.restart();
 
-                awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+            awaitAtMostOneMinute.until(() -> containerIsRestarted(rabbitMQ));
+            assertThat(channel.basicGet(queueName, !AUTO_ACK)).isNotNull();
+        }
+
+        private Boolean containerIsRestarted(DockerRabbitMQ rabbitMQ) {
+            try {
+                rabbitMQ.connectionFactory().newConnection();
+                return true;
+            } catch (Exception e) {
+                return false;
             }
         }
 
@@ -130,16 +160,6 @@ class RabbitMQTest {
                 connection1, connection2, connection3, connection4);
         }
 
-        private void closeQuietly(AutoCloseable... closeables) {
-            for (AutoCloseable closeable : closeables) {
-                try {
-                    closeable.close();
-                } catch (Exception e) {
-                    //ignoring exception
-                }
-            }
-        }
-
         @Nested
         class BroadCast {
 
@@ -291,6 +311,18 @@ class RabbitMQTest {
 
     }
 
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+
     private byte[] asBytes(String message) {
         return message.getBytes(StandardCharsets.UTF_8);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[04/17] james-project git commit: JAMES-2334 Introduce RabbitMQ implementation

Posted by ma...@apache.org.
JAMES-2334 Introduce RabbitMQ implementation


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/5041a0e6
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/5041a0e6
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/5041a0e6

Branch: refs/heads/master
Commit: 5041a0e65a19eefa8767ad53d94ac6ffa7c1fcd0
Parents: 61e94c6
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 14:31:23 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 pom.xml                             |  5 +++++
 server/pom.xml                      |  3 ++-
 server/queue/queue-rabbitmq/pom.xml | 37 ++++++++++++++++++++++++++++++++
 3 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0be0c28..91dfb14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1444,6 +1444,11 @@
             </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
+                <artifactId>james-server-queue-rabbitmq</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
                 <artifactId>james-server-queue-api</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 56545e3..f920a73 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -98,12 +98,13 @@
         <module>protocols/protocols-smtp</module>
         <module>protocols/webadmin-integration-test</module>
         <module>protocols/webadmin</module>
-        <module>queue/queue-activemq</module>
 
+        <module>queue/queue-activemq</module>
         <module>queue/queue-api</module>
         <module>queue/queue-file</module>
         <module>queue/queue-jms</module>
         <module>queue/queue-memory</module>
+        <module>queue/queue-rabbitmq</module>
 
         <module>task</module>
         <module>testing</module>

http://git-wip-us.apache.org/repos/asf/james-project/blob/5041a0e6/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
new file mode 100644
index 0000000..950089f
--- /dev/null
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.james</groupId>
+        <artifactId>james-server</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>james-server-queue-rabbitmq</artifactId>
+    <packaging>bundle</packaging>
+
+    <name>Apache James :: Server :: Mail Queue :: RabbitMQ</name>
+
+    <dependencies>
+    </dependencies>
+</project>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[05/17] james-project git commit: JAMES-2334 upgrade testcontainers

Posted by ma...@apache.org.
JAMES-2334 upgrade testcontainers


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8d568fac
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8d568fac
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8d568fac

Branch: refs/heads/master
Commit: 8d568fac49514fb308041150acdc27062e11e11f
Parents: 10dea04
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 11:21:28 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraWaitStrategy.java         | 19 +++++++++++--------
 .../backends/cassandra/DockerCassandraRule.java  |  5 +++--
 .../apache/james/mailbox/tika/TikaContainer.java |  2 +-
 pom.xml                                          |  2 +-
 .../james/util/docker/SwarmGenericContainer.java |  2 +-
 .../util/scanner/SpamAssassinExtension.java      |  2 +-
 .../util/scanner/SpamAssassinWaitStrategy.java   | 17 ++++++++++-------
 .../james/user/ldap/LdapGenericContainer.java    |  2 +-
 .../transport/mailets/SpamAssassinTest.java      |  2 +-
 .../org/apache/james/jmap/ContainerTest.java     |  2 +-
 .../java/org/apache/james/utils/FakeSmtp.java    |  2 +-
 11 files changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
index d6f71ff..bb47552 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraWaitStrategy.java
@@ -25,28 +25,31 @@ import java.util.concurrent.TimeUnit;
 
 import org.rnorth.ducttape.unreliables.Unreliables;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
 
 import com.google.common.primitives.Ints;
 
 public class CassandraWaitStrategy implements WaitStrategy {
 
     private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
-    private Duration timeout = DEFAULT_TIMEOUT;
+    private final GenericContainer<?> cassandraContainer;
+    private final Duration timeout;
 
-    public CassandraWaitStrategy() {
-        this(DEFAULT_TIMEOUT);
+    public CassandraWaitStrategy(GenericContainer<?> cassandraContainer) {
+        this(cassandraContainer, DEFAULT_TIMEOUT);
     }
 
-    public CassandraWaitStrategy(Duration timeout) {
+    public CassandraWaitStrategy(GenericContainer<?> cassandraContainer, Duration timeout) {
+        this.cassandraContainer = cassandraContainer;
         this.timeout = timeout;
     }
 
     @Override
-    public void waitUntilReady(@SuppressWarnings("rawtypes") GenericContainer container) {
+    public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
         Unreliables.retryUntilTrue(Ints.checkedCast(timeout.getSeconds()), TimeUnit.SECONDS, () -> {
                 try {
-                    return container
+                    return cassandraContainer
                         .execInContainer("cqlsh", "-e", "show host")
                         .getStdout()
                         .contains("Connected to Test Cluster");
@@ -59,6 +62,6 @@ public class CassandraWaitStrategy implements WaitStrategy {
 
     @Override
     public WaitStrategy withStartupTimeout(Duration startupTimeout) {
-        return new CassandraWaitStrategy(startupTimeout);
+        return new CassandraWaitStrategy(cassandraContainer, startupTimeout);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
----------------------------------------------------------------------
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
index 126d31f..327fa78 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/DockerCassandraRule.java
@@ -89,8 +89,9 @@ public class DockerCassandraRule implements TestRule {
             .withCreateContainerCmdModifier(cmd -> cmd.getHostConfig().withBinds(new Binds(new Bind(tmpFsName, new Volume("/var/lib/cassandra")))))
             .withCreateContainerCmdModifier(cmd -> cmd.withMemory(2000 * 1024 * 1024L))
             .withExposedPorts(CASSANDRA_PORT)
-            .withLogConsumer(this::displayDockerLog)
-            .waitingFor(new CassandraWaitStrategy());
+            .withLogConsumer(this::displayDockerLog);
+        cassandraContainer
+            .waitingFor(new CassandraWaitStrategy(cassandraContainer));
     }
 
     private void displayDockerLog(OutputFrame outputFrame) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
----------------------------------------------------------------------
diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
index 8bb7733..ec6b776 100644
--- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
+++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaContainer.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.james.util.docker.Images;
 import org.apache.james.util.docker.SwarmGenericContainer;
 import org.junit.rules.ExternalResource;
-import org.testcontainers.containers.wait.Wait;
+import org.testcontainers.containers.wait.strategy.Wait;
 
 import com.google.common.primitives.Ints;
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3fd8a14..75c3a58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -642,7 +642,7 @@
         <jetty.version>9.4.7.v20170914</jetty.version>
         <cassandra-unit.version>2.1.9.2</cassandra-unit.version>
         <assertj-guava.version>3.1.0</assertj-guava.version>
-        <testcontainers-version>1.4.2</testcontainers-version>
+        <testcontainers-version>1.7.3</testcontainers-version>
         <metrics.version>3.2.1</metrics.version>
         <joda.version>2.9.4</joda.version>
         <assertj.version>3.3.0</assertj.version>

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java b/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
index 20c66dd..e9fd984 100644
--- a/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
+++ b/server/container/util-java8/src/test/java/org/apache/james/util/docker/SwarmGenericContainer.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
 import org.testcontainers.images.builder.ImageFromDockerfile;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
index 68bbfe4..46100bc 100644
--- a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
+++ b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinExtension.java
@@ -52,7 +52,7 @@ public class SpamAssassinExtension implements BeforeEachCallback, AfterEachCallb
                 .withFileFromClasspath("spamd.sh", "docker/spamassassin/spamd.sh")
                 .withFileFromClasspath("rule-update.sh", "docker/spamassassin/rule-update.sh")
                 .withFileFromClasspath("bayes_pg.sql", "docker/spamassassin/bayes_pg.sql"));
-        spamAssassinContainer.waitingFor(new SpamAssassinWaitStrategy());
+        spamAssassinContainer.waitingFor(new SpamAssassinWaitStrategy(spamAssassinContainer));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
index 108d0d4..1341256 100644
--- a/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
+++ b/server/container/util/src/test/java/org/apache/james/util/scanner/SpamAssassinWaitStrategy.java
@@ -25,28 +25,31 @@ import java.util.concurrent.TimeUnit;
 
 import org.rnorth.ducttape.unreliables.Unreliables;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.wait.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
 
 import com.google.common.primitives.Ints;
 
 public class SpamAssassinWaitStrategy implements WaitStrategy {
 
     private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
+    private final GenericContainer<?> spamAssassinContainer;
     private Duration timeout = DEFAULT_TIMEOUT;
 
-    public SpamAssassinWaitStrategy() {
-        this(DEFAULT_TIMEOUT);
+    public SpamAssassinWaitStrategy(GenericContainer<?> spamAssassinContainer) {
+        this(spamAssassinContainer, DEFAULT_TIMEOUT);
     }
 
-    public SpamAssassinWaitStrategy(Duration timeout) {
+    public SpamAssassinWaitStrategy(GenericContainer<?> spamAssassinContainer, Duration timeout) {
+        this.spamAssassinContainer = spamAssassinContainer;
         this.timeout = timeout;
     }
 
     @Override
-    public void waitUntilReady(@SuppressWarnings("rawtypes") GenericContainer container) {
+    public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
         Unreliables.retryUntilTrue(Ints.checkedCast(timeout.getSeconds()), TimeUnit.SECONDS, () -> {
                 try {
-                    return container
+                    return spamAssassinContainer
                         .execInContainer("spamassassin", "-V")
                         .getStdout()
                         .contains("SpamAssassin version 3.4.1");
@@ -59,6 +62,6 @@ public class SpamAssassinWaitStrategy implements WaitStrategy {
 
     @Override
     public WaitStrategy withStartupTimeout(Duration startupTimeout) {
-        return new SpamAssassinWaitStrategy(startupTimeout);
+        return new SpamAssassinWaitStrategy(spamAssassinContainer, startupTimeout);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
----------------------------------------------------------------------
diff --git a/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java b/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
index 4a6fb61..992b4bf 100644
--- a/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
+++ b/server/data/data-ldap-integration-testing/src/test/java/org/apache/james/user/ldap/LdapGenericContainer.java
@@ -20,7 +20,7 @@ package org.apache.james.user.ldap;
 
 import org.apache.james.util.docker.SwarmGenericContainer;
 import org.junit.rules.ExternalResource;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 import org.testcontainers.images.builder.ImageFromDockerfile;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
----------------------------------------------------------------------
diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
index 8ed9729..274b0e8 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/SpamAssassinTest.java
@@ -51,7 +51,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 public class SpamAssassinTest {
     private static final String SPAM_CONTENT = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X";

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
----------------------------------------------------------------------
diff --git a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
index bcc8a18..6e3d02b 100644
--- a/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
+++ b/server/protocols/jmap-integration-testing/jmap-integration-testing-common/src/test/java/org/apache/james/jmap/ContainerTest.java
@@ -30,7 +30,7 @@ import org.apache.james.util.docker.Images;
 import org.apache.james.util.docker.SwarmGenericContainer;
 import org.junit.Rule;
 import org.junit.Test;
-import org.testcontainers.containers.wait.HttpWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 
 public class ContainerTest {
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/8d568fac/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
----------------------------------------------------------------------
diff --git a/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java b/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
index a0694d2..4b5fc3a 100644
--- a/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
+++ b/server/testing/src/main/java/org/apache/james/utils/FakeSmtp.java
@@ -31,7 +31,7 @@ import org.apache.james.util.docker.SwarmGenericContainer;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
-import org.testcontainers.containers.wait.HostPortWaitStrategy;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 
 import com.jayway.awaitility.core.ConditionFactory;
 import com.jayway.restassured.builder.RequestSpecBuilder;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[17/17] james-project git commit: JAMES-2334 Add node killing tests for RabbitMQ cluster

Posted by ma...@apache.org.
JAMES-2334 Add node killing tests for RabbitMQ cluster


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/552e44d0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/552e44d0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/552e44d0

Branch: refs/heads/master
Commit: 552e44d073580b2565d163e007b3a3144c671bcc
Parents: 00adfa8
Author: benwa <bt...@linagora.com>
Authored: Fri Feb 9 10:50:27 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../DockerClusterRabbitMQExtension.java         | 120 ++++++++
 .../DockerClusterRabbitMQExtention.java         | 112 --------
 .../james/queue/rabbitmq/InMemoryConsumer.java  |  14 +-
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 281 ++++++++++++++-----
 4 files changed, 345 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
new file mode 100644
index 0000000..000fef6
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtension.java
@@ -0,0 +1,120 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.testcontainers.containers.Network;
+
+import com.google.common.collect.ImmutableList;
+import com.rabbitmq.client.Address;
+
+public class DockerClusterRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+    public static final String RABBIT_1 = "rabbit1";
+    public static final String RABBIT_2 = "rabbit2";
+    public static final String RABBIT_3 = "rabbit3";
+    private DockerRabbitMQCluster cluster;
+    private Network network;
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        String cookie = DigestUtils.sha1Hex("secret cookie here");
+
+        network = Network.NetworkImpl.builder()
+            .enableIpv6(false)
+            .createNetworkCmdModifiers(ImmutableList.of())
+            .build();
+
+        DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
+        DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
+        DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
+
+        rabbitMQ1.start();
+        rabbitMQ2.start();
+        rabbitMQ3.start();
+
+        rabbitMQ2.join(rabbitMQ1);
+        rabbitMQ3.join(rabbitMQ1);
+
+        rabbitMQ2.startApp();
+        rabbitMQ3.startApp();
+
+        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        cluster.stop();
+        network.close();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return cluster;
+    }
+
+    public static class DockerRabbitMQCluster {
+
+        private final DockerRabbitMQ rabbitMQ1;
+        private final DockerRabbitMQ rabbitMQ2;
+        private final DockerRabbitMQ rabbitMQ3;
+
+        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
+            this.rabbitMQ1 = rabbitMQ1;
+            this.rabbitMQ2 = rabbitMQ2;
+            this.rabbitMQ3 = rabbitMQ3;
+        }
+
+        public void stop() {
+            rabbitMQ1.stop();
+            rabbitMQ2.stop();
+            rabbitMQ3.stop();
+        }
+
+        public DockerRabbitMQ getRabbitMQ1() {
+            return rabbitMQ1;
+        }
+
+        public DockerRabbitMQ getRabbitMQ2() {
+            return rabbitMQ2;
+        }
+
+        public DockerRabbitMQ getRabbitMQ3() {
+            return rabbitMQ3;
+        }
+
+        public ImmutableList<Address> getAddresses() {
+            return ImmutableList.of(
+                new Address(rabbitMQ1.getHostIp(), rabbitMQ1.getPort()),
+                new Address(rabbitMQ2.getHostIp(), rabbitMQ2.getPort()),
+                new Address(rabbitMQ3.getHostIp(), rabbitMQ3.getPort()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
deleted file mode 100644
index fae2016..0000000
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerClusterRabbitMQExtention.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.queue.rabbitmq;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.testcontainers.containers.Network;
-
-import com.google.common.collect.ImmutableList;
-
-public class DockerClusterRabbitMQExtention implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
-
-    public static final String RABBIT_1 = "rabbit1";
-    public static final String RABBIT_2 = "rabbit2";
-    public static final String RABBIT_3 = "rabbit3";
-    private DockerRabbitMQCluster cluster;
-    private Network network;
-
-    @Override
-    public void beforeEach(ExtensionContext context) throws Exception {
-        String cookie = DigestUtils.sha1Hex("secret cookie here");
-
-        network = Network.NetworkImpl.builder()
-            .enableIpv6(false)
-            .createNetworkCmdModifiers(ImmutableList.of())
-            .build();
-
-        DockerRabbitMQ rabbitMQ1 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_1, cookie, "rabbit@rabbit1", network);
-        DockerRabbitMQ rabbitMQ2 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_2, cookie, "rabbit@rabbit2", network);
-        DockerRabbitMQ rabbitMQ3 = DockerRabbitMQ.withCookieAndNodeName(RABBIT_3, cookie, "rabbit@rabbit3", network);
-
-        rabbitMQ1.start();
-        rabbitMQ2.start();
-        rabbitMQ3.start();
-
-        rabbitMQ2.join(rabbitMQ1);
-        rabbitMQ3.join(rabbitMQ1);
-
-        rabbitMQ2.startApp();
-        rabbitMQ3.startApp();
-
-        cluster = new DockerRabbitMQCluster(rabbitMQ1, rabbitMQ2, rabbitMQ3);
-    }
-
-    @Override
-    public void afterEach(ExtensionContext context) throws Exception {
-        cluster.stop();
-        network.close();
-    }
-
-    @Override
-    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return (parameterContext.getParameter().getType() == DockerRabbitMQCluster.class);
-    }
-
-    @Override
-    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return cluster;
-    }
-
-    public static class DockerRabbitMQCluster {
-
-        private final DockerRabbitMQ rabbitMQ1;
-        private final DockerRabbitMQ rabbitMQ2;
-        private final DockerRabbitMQ rabbitMQ3;
-
-        public DockerRabbitMQCluster(DockerRabbitMQ rabbitMQ1, DockerRabbitMQ rabbitMQ2, DockerRabbitMQ rabbitMQ3) {
-            this.rabbitMQ1 = rabbitMQ1;
-            this.rabbitMQ2 = rabbitMQ2;
-            this.rabbitMQ3 = rabbitMQ3;
-        }
-
-        public void stop() {
-            rabbitMQ1.stop();
-            rabbitMQ2.stop();
-            rabbitMQ3.stop();
-        }
-
-        public DockerRabbitMQ getRabbitMQ1() {
-            return rabbitMQ1;
-        }
-
-        public DockerRabbitMQ getRabbitMQ2() {
-            return rabbitMQ2;
-        }
-
-        public DockerRabbitMQ getRabbitMQ3() {
-            return rabbitMQ3;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
index f5f8fa2..6dd29af 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/InMemoryConsumer.java
@@ -30,15 +30,27 @@ import com.rabbitmq.client.Envelope;
 
 public class InMemoryConsumer extends DefaultConsumer {
 
+    @FunctionalInterface
+    interface Operation {
+        void perform();
+    }
+
     private final ConcurrentLinkedQueue<Integer> messages;
+    private final Operation operation;
 
     public InMemoryConsumer(Channel channel) {
+        this(channel, () -> {});
+    }
+
+    public InMemoryConsumer(Channel channel, Operation operation) {
         super(channel);
-        messages = new ConcurrentLinkedQueue<>();
+        this.operation = operation;
+        this.messages = new ConcurrentLinkedQueue<>();
     }
 
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+        operation.perform();
         Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
         messages.add(payload);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/552e44d0/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index 4a7dd07..d3f2cc1 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -30,116 +30,259 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
-import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
+import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtension.DockerRabbitMQCluster;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-@ExtendWith(DockerClusterRabbitMQExtention.class)
+@ExtendWith(DockerClusterRabbitMQExtension.class)
 class RabbitMQClusterTest {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClusterTest.class);
+
     private static final String QUEUE = "queue";
 
-    private Connection node1Connection;
-    private Channel node1Channel;
-    private Connection node2Connection;
-    private Channel node2Channel;
-
-    @BeforeEach
-    void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
-        ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
-        node1Connection = node1ConnectionFactory.newConnection();
-        node2Connection = node2ConnectionFactory.newConnection();
-        node1Channel = node1Connection.createChannel();
-        node2Channel = node2Connection.createChannel();
-    }
+    @Nested
+    class ClusterSharing {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private ConnectionFactory node2ConnectionFactory;
+        private Connection node1Connection;
+        private Connection node2Connection;
+        private Channel node1Channel;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+            node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+            node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+            node1Connection = node1ConnectionFactory.newConnection();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node1Channel = node1Connection.createChannel();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+        }
+
+        @Test
+        void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
+            String stdout = cluster.getRabbitMQ1().container()
+                .execInContainer("rabbitmqctl", "cluster_status")
+                .getStdout();
+
+            assertThat(stdout)
+                .contains(
+                    DockerClusterRabbitMQExtension.RABBIT_1,
+                    DockerClusterRabbitMQExtension.RABBIT_2,
+                    DockerClusterRabbitMQExtension.RABBIT_3);
+        }
+
+        @Test
+        void queuesShouldBeShared() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        @Test
+        void queuesShouldBeDeclarableOnAnotherNode() throws Exception {
+            node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer2);
+
+            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
 
-    @AfterEach
-    void tearDown() {
-        closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
     }
 
-    private void closeQuietly(AutoCloseable... closeables) {
-        for (AutoCloseable closeable : closeables) {
+    @Nested
+    class ClusterNodesFailure {
+
+        private ConnectionFactory node1ConnectionFactory;
+        private Connection resilientConnection;
+        private Channel resilientChannel;
+        private Connection node2Connection;
+        private Channel node2Channel;
+
+        @BeforeEach
+        void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+            node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+            resilientConnection = node1ConnectionFactory.newConnection(cluster.getAddresses());
+            resilientChannel = resilientConnection.createChannel();
+            ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+            node2Connection = node2ConnectionFactory.newConnection();
+            node2Channel = node2Connection.createChannel();
+        }
+
+        @AfterEach
+        void tearDown() {
+            closeQuietly(resilientConnection, resilientChannel);
+        }
+
+        @Disabled("For some reason, we are unable to recover topology when reconnecting")
+        @Test
+        void nodeKillingWhenProducing(DockerRabbitMQCluster cluster) throws Exception {
+            resilientChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            resilientChannel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            resilientChannel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+            int nbMessages = 20;
+            int firstBatchSize = nbMessages / 2;
+            IntStream.range(0, firstBatchSize)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+            InMemoryConsumer consumer = new InMemoryConsumer(node2Channel);
+            node2Channel.basicConsume(QUEUE, consumer);
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == firstBatchSize);
+
+            cluster.getRabbitMQ1().stop();
+
+            IntStream.range(firstBatchSize, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(this::tryPublishWithRetry);
+
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
+
+        private void tryPublishWithRetry(byte[] bytes) {
+            Awaitility.waitAtMost(Duration.ONE_MINUTE).pollInterval(Duration.ONE_SECOND).until(() -> tryPublish(bytes));
+        }
+
+        private boolean tryPublish(byte[] bytes) {
             try {
-                closeable.close();
+                resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes);
+                return true;
             } catch (Exception e) {
-                //ignoring exception
+                LOGGER.error("failed publish", e);
+                return false;
             }
         }
-    }
 
-    @Test
-    void rabbitMQManagerShouldReturnThreeNodesWhenAskingForStatus(DockerRabbitMQCluster cluster) throws Exception {
-        String stdout = cluster.getRabbitMQ1().container()
-            .execInContainer("rabbitmqctl", "cluster_status")
-            .getStdout();
-
-        assertThat(stdout)
-            .contains(
-                DockerClusterRabbitMQExtention.RABBIT_1,
-                DockerClusterRabbitMQExtention.RABBIT_2,
-                DockerClusterRabbitMQExtention.RABBIT_3);
-    }
+        @Test
+        void connectingToAClusterWithAFailedRabbit(DockerRabbitMQCluster cluster) throws Exception {
+            ConnectionFactory node3ConnectionFactory = cluster.getRabbitMQ3().connectionFactory();
+            cluster.getRabbitMQ3().stop();
 
-    @Test
-    void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
-        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-        node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-        node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+            try (Connection connection = node3ConnectionFactory.newConnection(cluster.getAddresses());
+                 Channel channel = connection.createChannel()) {
 
-        int nbMessages = 10;
-        IntStream.range(0, nbMessages)
-            .mapToObj(i -> asBytes(String.valueOf(i)))
-            .forEach(Throwing.consumer(
-                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+                channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+                channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
+                int nbMessages = 10;
+                IntStream.range(0, nbMessages)
+                    .mapToObj(i -> asBytes(String.valueOf(i)))
+                    .forEach(Throwing.consumer(
+                        bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
-        node2Channel.basicConsume(QUEUE, consumer2);
+                InMemoryConsumer consumer = new InMemoryConsumer(channel);
+                channel.basicConsume(QUEUE, consumer);
 
-        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+                awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
 
-        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
-        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
-    }
+                List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+                assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+            }
+        }
 
-    @Test
-    void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
-        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-        node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-        node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+        @Test
+        void nodeKillingWhenConsuming(DockerRabbitMQCluster cluster) throws Exception {
+            node2Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+            node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        int nbMessages = 10;
-        IntStream.range(0, nbMessages)
-            .mapToObj(i -> asBytes(String.valueOf(i)))
-            .forEach(Throwing.consumer(
-                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+            int nbMessages = 10;
+            IntStream.range(0, nbMessages)
+                .mapToObj(i -> asBytes(String.valueOf(i)))
+                .forEach(Throwing.consumer(
+                    bytes -> resilientChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
-        node2Channel.basicConsume(QUEUE, consumer2);
+            AtomicInteger counter = new AtomicInteger(0);
+            InMemoryConsumer consumer = new InMemoryConsumer(resilientChannel,
+                () -> {
+                    if (counter.incrementAndGet() == nbMessages / 2) {
+                        cluster.getRabbitMQ1().stop();
+                    }
+                });
+            resilientChannel.basicConsume(QUEUE, consumer);
 
-        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
+            awaitAtMostOneMinute.until(() -> consumer.getConsumedMessages().size() == nbMessages);
+
+            List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Guavate.toImmutableList());
+            assertThat(consumer.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+        }
 
-        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
-        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
     }
 
     private byte[] asBytes(String message) {
         return message.getBytes(StandardCharsets.UTF_8);
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[14/17] james-project git commit: JAMES-2334 Adding logs to rabbitMQ docker

Posted by ma...@apache.org.
JAMES-2334 Adding logs to rabbitMQ docker


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8767d00b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8767d00b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8767d00b

Branch: refs/heads/master
Commit: 8767d00bfb5a53a0e71e6f1f51b57d473ffb7e92
Parents: fb67d5f
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 11:03:40 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |  9 ++++++++
 .../james/queue/rabbitmq/DockerRabbitMQ.java    |  6 ++++-
 .../src/test/resources/logback-test.xml         | 23 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 3df0fb6..d7d8ad7 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -78,5 +78,14 @@
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index 8c7bae8..8fea5b0 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -18,12 +18,15 @@
  ****************************************************************/
 package org.apache.james.queue.rabbitmq;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.GenericContainer;
 
 import com.rabbitmq.client.ConnectionFactory;
 
 public class DockerRabbitMQ {
+    private static final Logger LOGGER = LoggerFactory.getLogger(DockerRabbitMQ.class);
 
     private static final int DEFAULT_RABBITMQ_PORT = 5672;
     private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
@@ -37,7 +40,8 @@ public class DockerRabbitMQ {
         container = new GenericContainer<>("rabbitmq:3.7.5")
                 .withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
                 .withExposedPorts(DEFAULT_RABBITMQ_PORT)
-                .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this));
+                .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this))
+                .withLogConsumer(frame -> LOGGER.debug(frame.getUtf8String()));
     }
 
     public String getHostIp() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/8767d00b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..fa5a922
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/resources/logback-test.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+        <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+                <resetJUL>true</resetJUL>
+        </contextListener>
+
+        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+                <encoder>
+                        <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
+                        <immediateFlush>false</immediateFlush>
+                </encoder>
+        </appender>
+
+        <root level="ERROR">
+                <appender-ref ref="CONSOLE" />
+        </root>
+
+        <logger name="org.testcontainers" level="ERROR"/>
+        <logger name="org.apache.james" level="DEBUG"/>
+        <logger name="org.apache.james.queue.rabbitmq.DockerRabbitMQ" level="DEBUG"/>
+
+</configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[02/17] james-project git commit: JAMES-2334 Enforce common image usage for RabbitMQ

Posted by ma...@apache.org.
JAMES-2334 Enforce common image usage for RabbitMQ


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/21eec770
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/21eec770
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/21eec770

Branch: refs/heads/master
Commit: 21eec7701198a232ed9bd17b55a88f4dac083dbb
Parents: 552e44d
Author: benwa <bt...@linagora.com>
Authored: Tue May 29 10:10:50 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../src/test/java/org/apache/james/util/docker/Images.java     | 2 +-
 server/queue/queue-rabbitmq/pom.xml                            | 6 ++++++
 .../java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java   | 3 ++-
 3 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java b/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
index 7b75b45..549cf02 100644
--- a/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
+++ b/server/container/util-java8/src/test/java/org/apache/james/util/docker/Images.java
@@ -21,7 +21,7 @@ package org.apache.james.util.docker;
 
 public interface Images {
     String FAKE_SMTP = "weave/rest-smtp-sink:latest";
-    String RABBITMQ = "rabbitmq:3";
+    String RABBITMQ = "rabbitmq:3.7.5";
     String ELASTICSEARCH = "elasticsearch:2.2.2";
     String NGINX = "nginx:1.7.1";
     String TIKA = "logicalspark/docker-tikaserver:1.15rc2";

http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 86bbfa2..c2fff5b 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -34,6 +34,12 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.james</groupId>
+            <artifactId>james-server-util-java8</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>com.github.fge</groupId>
             <artifactId>throwing-lambdas</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/21eec770/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
index a964b99..d890c2b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -20,6 +20,7 @@ package org.apache.james.queue.rabbitmq;
 
 import java.util.Optional;
 
+import org.apache.james.util.docker.Images;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
@@ -52,7 +53,7 @@ public class DockerRabbitMQ {
 
     @SuppressWarnings("resource")
     private DockerRabbitMQ(Optional<String> hostName, Optional<String> erlangCookie, Optional<String> nodeName, Optional<Network> net) {
-        container = new GenericContainer<>("rabbitmq:3.7.5")
+        container = new GenericContainer<>(Images.RABBITMQ)
                 .withCreateContainerCmdModifier(cmd -> cmd.withName(hostName.orElse("localhost")))
                 .withCreateContainerCmdModifier(cmd -> cmd.withHostName(hostName.orElse(DEFAULT_RABBIT_NODE)))
                 .withExposedPorts(DEFAULT_RABBITMQ_PORT)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[07/17] james-project git commit: JAMES-2334 Upgrade amqp client version

Posted by ma...@apache.org.
JAMES-2334 Upgrade amqp client version


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/61e94c6a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/61e94c6a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/61e94c6a

Branch: refs/heads/master
Commit: 61e94c6aae7b87d2c663154d0137930e2044c4cc
Parents: 8d568fa
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:16:17 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/61e94c6a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75c3a58..0be0c28 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1704,7 +1704,7 @@
             <dependency>
                 <groupId>com.rabbitmq</groupId>
                 <artifactId>amqp-client</artifactId>
-                <version>4.0.0</version>
+                <version>5.2.0</version>
             </dependency>
             <dependency>
                 <groupId>com.sparkjava</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[09/17] james-project git commit: JAMES-2334 Demonstrate that broadcast is working

Posted by ma...@apache.org.
JAMES-2334 Demonstrate that broadcast is working


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9f78f0d7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9f78f0d7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9f78f0d7

Branch: refs/heads/master
Commit: 9f78f0d75f5690b6680b44972423966629a3296a
Parents: 30679c4
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 7 10:01:45 2018 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |   8 +
 .../james/queue/rabbitmq/RabbitMQTest.java      | 145 ++++++++++++++++---
 2 files changed, 131 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 27b0c84..3df0fb6 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -34,6 +34,14 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>throwing-lambdas</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.steveash.guavate</groupId>
+            <artifactId>guavate</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/9f78f0d7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
index 465be65..cad0303 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -25,51 +25,152 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.IntStream;
 
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 
 @ExtendWith(DockerRabbitMQExtension.class)
 class RabbitMQTest {
 
     private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
 
-    @Test
-    void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
-        ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
-        try (Connection connection = connectionFactory.newConnection();
-                Channel channel = connection.createChannel()) {
-            String queueName = createQueue(channel);
+    @Nested
+    class SingleConsumerTest {
 
-            publishAMessage(channel);
+        @Test
+        void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
+            ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+            try (Connection connection = connectionFactory.newConnection();
+                 Channel channel = connection.createChannel()) {
+                String queueName = createQueue(channel);
 
-            awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+                publishAMessage(channel);
+
+                awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+            }
         }
-    }
 
-    private String createQueue(Channel channel) throws IOException {
-        channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-        String queueName = channel.queueDeclare().getQueue();
-        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
-        return queueName;
-    }
+        private String createQueue(Channel channel) throws IOException {
+            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+            String queueName = channel.queueDeclare().getQueue();
+            channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
+            return queueName;
+        }
+
+        private void publishAMessage(Channel channel) throws IOException {
+            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+        }
+
+        private Boolean messageReceived(Channel channel, String queueName) {
+            try {
+                return channel.basicGet(queueName, !AUTO_ACK) != null;
+            } catch (Exception e) {
+                return false;
+            }
+        }
 
-    private void publishAMessage(Channel channel) throws IOException {
-        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
     }
 
-    private Boolean messageReceived(Channel channel, String queueName) {
-        try {
-            return channel.basicGet(queueName, !AUTO_ACK) != null;
-        } catch (Exception e) {
-            return false;
+    @Nested
+    class BroadcastTest {
+
+        private ConnectionFactory connectionFactory1;
+        private ConnectionFactory connectionFactory2;
+        private ConnectionFactory connectionFactory3;
+        private ConnectionFactory connectionFactory4;
+
+        @BeforeEach
+        public void setup(DockerRabbitMQ rabbitMQ) {
+            connectionFactory1 = rabbitMQ.connectionFactory();
+            connectionFactory2 = rabbitMQ.connectionFactory();
+            connectionFactory3 = rabbitMQ.connectionFactory();
+            connectionFactory4 = rabbitMQ.connectionFactory();
+        }
+
+        // In the following case, each consumer will receive the messages produced by the
+        // producer
+        // To do so, each consumer will bind it's queue to the producer exchange.
+        @Test
+        public void rabbitMQShouldSupportTheBroadcastCase() throws Exception {
+            ImmutableList<Integer> expectedResult = IntStream.range(0, 10).boxed().collect(Guavate.toImmutableList());
+            ConcurrentLinkedQueue<Integer> results2 = new ConcurrentLinkedQueue<>();
+            ConcurrentLinkedQueue<Integer> results3 = new ConcurrentLinkedQueue<>();
+            ConcurrentLinkedQueue<Integer> results4 = new ConcurrentLinkedQueue<>();
+
+            try (Connection connection1 = connectionFactory1.newConnection();
+                 Channel publisherChannel = connection1.createChannel();
+                 Connection connection2 = connectionFactory2.newConnection();
+                 Channel subscriberChannel2 = connection2.createChannel();
+                 Connection connection3 = connectionFactory3.newConnection();
+                 Channel subscriberChannel3 = connection3.createChannel();
+                 Connection connection4 = connectionFactory4.newConnection();
+                 Channel subscriberChannel4 = connection4.createChannel()) {
+
+                // Declare a single exchange and three queues attached to it.
+                publisherChannel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+
+                String queue2 = subscriberChannel2.queueDeclare().getQueue();
+                subscriberChannel2.queueBind(queue2, EXCHANGE_NAME, ROUTING_KEY);
+                String queue3 = subscriberChannel3.queueDeclare().getQueue();
+                subscriberChannel3.queueBind(queue3, EXCHANGE_NAME, ROUTING_KEY);
+                String queue4 = subscriberChannel4.queueDeclare().getQueue();
+                subscriberChannel4.queueBind(queue4, EXCHANGE_NAME, ROUTING_KEY);
+
+                subscriberChannel2.basicConsume(queue2, storeInResultCallBack(subscriberChannel2, results2));
+                subscriberChannel3.basicConsume(queue3, storeInResultCallBack(subscriberChannel3, results3));
+                subscriberChannel4.basicConsume(queue4, storeInResultCallBack(subscriberChannel4, results4));
+
+                // the publisher will produce 10 messages
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(s -> s.getBytes(StandardCharsets.UTF_8))
+                    .forEach(Throwing.consumer(
+                        bytes -> publisherChannel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+
+                awaitAtMostOneMinute.until(() -> allMessageReceived(expectedResult, results2, results3, results4));
+
+                // Check every subscriber have receive all the messages.
+                assertThat(results2).containsOnlyElementsOf(expectedResult);
+                assertThat(results3).containsOnlyElementsOf(expectedResult);
+                assertThat(results4).containsOnlyElementsOf(expectedResult);
+            }
+        }
+
+        private boolean allMessageReceived(ImmutableList<Integer> expectedResult, ConcurrentLinkedQueue<Integer> results2, ConcurrentLinkedQueue<Integer> results3, ConcurrentLinkedQueue<Integer> results4) {
+            return Iterables.size(
+                Iterables.concat(results2, results3, results4))
+                == expectedResult.size() * 3;
+        }
+
+        private DefaultConsumer storeInResultCallBack(Channel channel, ConcurrentLinkedQueue<Integer> results) {
+            return new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    Integer payload = Integer.valueOf(new String(body, StandardCharsets.UTF_8));
+                    results.add(payload);
+                }
+            };
         }
     }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[06/17] james-project git commit: JAMES-2334 Provide a JUnit extension for RabbitMQ in Docker

Posted by ma...@apache.org.
JAMES-2334 Provide a JUnit extension for RabbitMQ in Docker


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8daf8951
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8daf8951
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8daf8951

Branch: refs/heads/master
Commit: 8daf8951bfaeb1e23a8e2bfda749d516799a2edd
Parents: 5041a0e
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:14:57 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             | 33 +++++++++
 .../james/queue/rabbitmq/DockerRabbitMQ.java    | 74 ++++++++++++++++++++
 .../queue/rabbitmq/DockerRabbitMQExtension.java | 55 +++++++++++++++
 .../rabbitmq/DockerRabbitMQExtensionTest.java   | 50 +++++++++++++
 .../queue/rabbitmq/RabbitMQWaitStrategy.java    | 67 ++++++++++++++++++
 5 files changed, 279 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 950089f..85d888d 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -33,5 +33,38 @@
     <name>Apache James :: Server :: Mail Queue :: RabbitMQ</name>
 
     <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
new file mode 100644
index 0000000..0eab7a2
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQ.java
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.testcontainers.containers.GenericContainer;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+public class DockerRabbitMQ {
+
+    private static final int DEFAULT_RABBITMQ_PORT = 5672;
+    private static final String DEFAULT_RABBITMQ_HOSTNAME = "my-rabbit";
+    private static final String DEFAULT_RABBITMQ_USERNAME = "guest";
+    private static final String DEFAULT_RABBITMQ_PASSWORD = "guest";
+
+    private GenericContainer<?> container;
+
+    @SuppressWarnings("resource")
+    public DockerRabbitMQ() {
+        container = new GenericContainer<>("rabbitmq:3.7.5")
+                .withCreateContainerCmdModifier(cmd -> cmd.withHostName(DEFAULT_RABBITMQ_HOSTNAME))
+                .withExposedPorts(DEFAULT_RABBITMQ_PORT)
+                .waitingFor(RabbitMQWaitStrategy.withDefaultTimeout(this));
+    }
+
+    public String getHostIp() {
+        return container.getContainerIpAddress();
+    }
+
+    public Integer getPort() {
+        return container.getMappedPort(DEFAULT_RABBITMQ_PORT);
+    }
+
+    public String getUsername() {
+        return DEFAULT_RABBITMQ_USERNAME;
+    }
+
+    public String getPassword() {
+        return DEFAULT_RABBITMQ_PASSWORD;
+    }
+
+    public ConnectionFactory connectionFactory() {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(getHostIp());
+        connectionFactory.setPort(getPort());
+        connectionFactory.setUsername(getUsername());
+        connectionFactory.setPassword(getPassword());
+        return connectionFactory;
+    }
+
+    public void start() {
+        container.start();
+    }
+
+    public void stop() {
+        container.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
new file mode 100644
index 0000000..925ff08
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtension.java
@@ -0,0 +1,55 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+
+public class DockerRabbitMQExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+
+    private DockerRabbitMQ rabbitMQ;
+
+    @Override
+    public void beforeEach(ExtensionContext context) {
+        rabbitMQ = new DockerRabbitMQ();
+        rabbitMQ.start();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        rabbitMQ.stop();
+    }
+
+    @Override
+    public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return (parameterContext.getParameter().getType() == DockerRabbitMQ.class);
+    }
+
+    @Override
+    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
+        return rabbitMQ;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
new file mode 100644
index 0000000..f2a0cea
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/DockerRabbitMQExtensionTest.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+public class DockerRabbitMQExtensionTest {
+
+    private ConnectionFactory connectionFactory;
+
+    @BeforeEach
+    public void setup(DockerRabbitMQ rabbitMQ) {
+        connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(rabbitMQ.getHostIp());
+        connectionFactory.setPort(rabbitMQ.getPort());
+        connectionFactory.setUsername(rabbitMQ.getUsername());
+        connectionFactory.setPassword(rabbitMQ.getPassword());
+    }
+
+    @Test
+    public void containerShouldBeUp() throws Exception {
+        try (Connection connection = connectionFactory.newConnection()) {
+            assertThat(connection.isOpen()).isTrue();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/8daf8951/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
new file mode 100644
index 0000000..ba6ce3b
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQWaitStrategy.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategyTarget;
+
+import com.google.common.primitives.Ints;
+import com.rabbitmq.client.Connection;
+
+public class RabbitMQWaitStrategy implements WaitStrategy {
+
+    private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
+
+    public static RabbitMQWaitStrategy withDefaultTimeout(DockerRabbitMQ rabbitMQ) {
+        return new RabbitMQWaitStrategy(rabbitMQ, DEFAULT_TIMEOUT);
+    }
+
+    private final DockerRabbitMQ rabbitMQ;
+    private final Duration timeout;
+
+    public RabbitMQWaitStrategy(DockerRabbitMQ rabbitMQ, Duration timeout) {
+        this.rabbitMQ = rabbitMQ;
+        this.timeout = timeout;
+    }
+
+    @Override
+    public void waitUntilReady(WaitStrategyTarget waitStrategyTarget) {
+        int seconds = Ints.checkedCast(this.timeout.getSeconds());
+
+        Unreliables.retryUntilTrue(seconds, TimeUnit.SECONDS, this::isConnected);
+    }
+
+    private Boolean isConnected() throws IOException, TimeoutException {
+        try (Connection connection = rabbitMQ.connectionFactory().newConnection()) {
+            return connection.isOpen();
+        }
+    }
+
+    @Override
+    public WaitStrategy withStartupTimeout(Duration startupTimeout) {
+        return new RabbitMQWaitStrategy(rabbitMQ, startupTimeout);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[08/17] james-project git commit: JAMES-2334 Demonstrate that published messages are not lost without consumer

Posted by ma...@apache.org.
JAMES-2334 Demonstrate that published messages are not lost without consumer


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/30679c47
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/30679c47
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/30679c47

Branch: refs/heads/master
Commit: 30679c47140f93d402bace10caaa7221c94690ef
Parents: 8daf895
Author: Antoine Duprat <ad...@linagora.com>
Authored: Tue Feb 6 16:44:03 2018 +0100
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |  4 ++
 .../james/queue/rabbitmq/RabbitMQFixture.java   | 46 ++++++++++++
 .../james/queue/rabbitmq/RabbitMQTest.java      | 75 ++++++++++++++++++++
 3 files changed, 125 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml
index 85d888d..27b0c84 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -38,6 +38,10 @@
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.rabbitmq</groupId>
             <artifactId>amqp-client</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
new file mode 100644
index 0000000..e216690
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQFixture.java
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static com.jayway.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS;
+import static com.jayway.awaitility.Duration.ONE_MINUTE;
+
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
+import com.jayway.awaitility.core.ConditionFactory;
+import com.rabbitmq.client.AMQP;
+
+public class RabbitMQFixture {
+    public static final boolean DURABLE = true;
+    public static final boolean AUTO_ACK = true;
+    public static final AMQP.BasicProperties NO_PROPERTIES = null;
+    public static final String EXCHANGE_NAME = "exchangeName";
+    public static final String ROUTING_KEY = "routingKey";
+    public static final String DIRECT = "direct";
+
+    public static Duration slowPacedPollInterval = FIVE_HUNDRED_MILLISECONDS;
+    public static ConditionFactory calmlyAwait = Awaitility.with()
+        .pollInterval(slowPacedPollInterval)
+        .and()
+        .with()
+        .pollDelay(slowPacedPollInterval)
+        .await();
+    public static ConditionFactory awaitAtMostOneMinute = calmlyAwait.atMost(ONE_MINUTE);
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/30679c47/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
new file mode 100644
index 0000000..465be65
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQTest.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.AUTO_ACK;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+@ExtendWith(DockerRabbitMQExtension.class)
+class RabbitMQTest {
+
+    private static final byte[] PAYLOAD = "Hello, world!".getBytes(StandardCharsets.UTF_8);
+
+    @Test
+    void publishedEventWithoutSubscriberShouldNotBeLost(DockerRabbitMQ rabbitMQ) throws Exception {
+        ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+        try (Connection connection = connectionFactory.newConnection();
+                Channel channel = connection.createChannel()) {
+            String queueName = createQueue(channel);
+
+            publishAMessage(channel);
+
+            awaitAtMostOneMinute.until(() -> messageReceived(channel, queueName));
+        }
+    }
+
+    private String createQueue(Channel channel) throws IOException {
+        channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+        String queueName = channel.queueDeclare().getQueue();
+        channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
+        return queueName;
+    }
+
+    private void publishAMessage(Channel channel) throws IOException {
+        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, PAYLOAD);
+    }
+
+    private Boolean messageReceived(Channel channel, String queueName) {
+        try {
+            return channel.basicGet(queueName, !AUTO_ACK) != null;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[15/17] james-project git commit: JAMES-2334 Test cross cluster operations

Posted by ma...@apache.org.
JAMES-2334 Test cross cluster operations


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/00adfa82
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/00adfa82
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/00adfa82

Branch: refs/heads/master
Commit: 00adfa82472cf1ed8e4cc04a5d386c397cda9801
Parents: 749e641
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon May 28 15:30:14 2018 +0200
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu May 31 09:47:02 2018 +0200

----------------------------------------------------------------------
 .../queue/rabbitmq/RabbitMQClusterTest.java     | 121 ++++++++++---------
 1 file changed, 66 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/00adfa82/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
index f88ecf4..4a7dd07 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQClusterTest.java
@@ -23,17 +23,21 @@ import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DIRECT;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.DURABLE;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.EXCLUSIVE;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES;
-import static org.apache.james.queue.rabbitmq.RabbitMQFixture.MESSAGES_AS_BYTES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.NO_PROPERTIES;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.ROUTING_KEY;
 import static org.apache.james.queue.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.james.queue.rabbitmq.DockerClusterRabbitMQExtention.DockerRabbitMQCluster;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
@@ -46,15 +50,36 @@ import com.rabbitmq.client.ConnectionFactory;
 @ExtendWith(DockerClusterRabbitMQExtention.class)
 class RabbitMQClusterTest {
 
-    public static final String QUEUE = "queue";
+    private static final String QUEUE = "queue";
+
+    private Connection node1Connection;
+    private Channel node1Channel;
+    private Connection node2Connection;
+    private Channel node2Channel;
+
+    @BeforeEach
+    void setup(DockerRabbitMQCluster cluster) throws IOException, TimeoutException {
+        ConnectionFactory node1ConnectionFactory = cluster.getRabbitMQ1().connectionFactory();
+        ConnectionFactory node2ConnectionFactory = cluster.getRabbitMQ2().connectionFactory();
+        node1Connection = node1ConnectionFactory.newConnection();
+        node2Connection = node2ConnectionFactory.newConnection();
+        node1Channel = node1Connection.createChannel();
+        node2Channel = node2Connection.createChannel();
+    }
 
     @AfterEach
-    public void tearDown(DockerRabbitMQCluster cluster) throws Exception {
-        cluster.getRabbitMQ2()
-            .connectionFactory()
-            .newConnection()
-            .createChannel()
-            .queueDelete(QUEUE);
+    void tearDown() {
+        closeQuietly(node1Channel, node2Channel, node1Connection, node2Connection);
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        for (AutoCloseable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                //ignoring exception
+            }
+        }
     }
 
     @Test
@@ -71,64 +96,50 @@ class RabbitMQClusterTest {
     }
 
     @Test
-    public void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
-        ConcurrentLinkedDeque<Integer> result = new ConcurrentLinkedDeque<>();
+    void queuesShouldBeShared(DockerRabbitMQCluster cluster) throws Exception {
+        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+        node1Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+        node1Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+        int nbMessages = 10;
+        IntStream.range(0, nbMessages)
+            .mapToObj(i -> asBytes(String.valueOf(i)))
+            .forEach(Throwing.consumer(
+                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-        try (Connection connection = connectionFactory1.newConnection();
-             Channel channel = connection.createChannel()) {
-
-            channel.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
-            channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-            channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
-
-            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
-                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
-        }
 
-        try (Connection connection2 = connectionFactory2.newConnection();
-             Channel channel2 = connection2.createChannel()) {
+        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+        node2Channel.basicConsume(QUEUE, consumer2);
 
-            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
-            channel2.basicConsume(QUEUE, consumer2);
+        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
 
-            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
-
-            assertThat(consumer2.getConsumedMessages())
-                .containsOnlyElementsOf(MESSAGES);
-        }
-
-        assertThat(result)
-            .containsOnlyElementsOf(MESSAGES);
+        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
     }
 
     @Test
-    public void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
-        ConnectionFactory connectionFactory1 = cluster.getRabbitMQ1().connectionFactory();
-        ConnectionFactory connectionFactory2 = cluster.getRabbitMQ2().connectionFactory();
+    void queuesShouldBeDeclarableOnAnotherNode(DockerRabbitMQCluster cluster) throws Exception {
+        node1Channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
+        node2Channel.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
+        node2Channel.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
 
-        try (Connection connection = connectionFactory1.newConnection();
-             Channel channel = connection.createChannel();
-             Connection connection2 = connectionFactory2.newConnection();
-             Channel channel2 = connection2.createChannel()) {
+        int nbMessages = 10;
+        IntStream.range(0, nbMessages)
+            .mapToObj(i -> asBytes(String.valueOf(i)))
+            .forEach(Throwing.consumer(
+                bytes -> node1Channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
 
-            channel.exchangeDeclare(EXCHANGE_NAME, DIRECT, DURABLE);
-            channel2.queueDeclare(QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, ImmutableMap.of()).getQueue();
-            channel2.queueBind(QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+        InMemoryConsumer consumer2 = new InMemoryConsumer(node2Channel);
+        node2Channel.basicConsume(QUEUE, consumer2);
 
-            MESSAGES_AS_BYTES.forEach(Throwing.consumer(
-                    bytes -> channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+        awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == nbMessages);
 
-            InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
-            channel2.basicConsume(QUEUE, consumer2);
-
-            awaitAtMostOneMinute.until(() -> consumer2.getConsumedMessages().size() == MESSAGES.size());
+        List<Integer> expectedResult = IntStream.range(0, nbMessages).boxed().collect(Collectors.toList());
+        assertThat(consumer2.getConsumedMessages()).containsOnlyElementsOf(expectedResult);
+    }
 
-            assertThat(consumer2.getConsumedMessages())
-                .containsOnlyElementsOf(MESSAGES);
-        }
+    private byte[] asBytes(String message) {
+        return message.getBytes(StandardCharsets.UTF_8);
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org