You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:39 UTC

[james-project] 04/07: JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 93496e1d98915c1073d3767f3300aeddcbdf5754
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:17:43 2019 +0200

    JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8
---
 .../james/backends/rabbitmq/RabbitMQTest.java      | 149 +++++++++++++++++++--
 1 file changed, 141 insertions(+), 8 deletions(-)

diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
index b673a10..a41a903 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
@@ -37,10 +37,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.AfterEach;
@@ -59,6 +62,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
 
 class RabbitMQTest {
 
@@ -200,8 +204,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                     .mapToObj(String::valueOf)
                     .map(RabbitMQTest.this::asBytes)
-                    .forEach(Throwing.consumer(
-                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 awaitAtMostOneMinute.until(
                     () -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
@@ -233,8 +237,8 @@ class RabbitMQTest {
                 IntStream.range(0, nbMessages)
                     .mapToObj(String::valueOf)
                     .map(RabbitMQTest.this::asBytes)
-                    .forEach(Throwing.consumer(
-                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
                 InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
@@ -265,8 +269,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                         .mapToObj(String::valueOf)
                         .map(RabbitMQTest.this::asBytes)
-                        .forEach(Throwing.consumer(
-                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        .forEach(Throwing.<byte[]>consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>();
                 String dyingConsumerTag = "dyingConsumer";
@@ -297,8 +301,8 @@ class RabbitMQTest {
                 IntStream.range(0, 10)
                         .mapToObj(String::valueOf)
                         .map(RabbitMQTest.this::asBytes)
-                        .forEach(Throwing.consumer(
-                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+                        .forEach(Throwing.<byte[]>consumer(
+                                bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
 
                 String dyingConsumerTag = "dyingConsumer";
                 ImmutableMap<String, Object> arguments = ImmutableMap.of();
@@ -327,6 +331,135 @@ class RabbitMQTest {
                 assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 2).doesNotContain(0);
             }
 
+            @Test
+            void rabbitMQShouldDeliverMessageToSingleActiveConsumer() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+                channel2.basicQos(1);
+                channel3.basicQos(1);
+
+                AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+                AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> incrementCountForConsumerAndAckMessage(firstRegisteredConsumerMessageCount, message, channel2),
+                    (consumerTag -> {
+                    }));
+                channel3.basicConsume(WORK_QUEUE, !AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> incrementCountForConsumerAndAckMessage(secondRegisteredConsumerMessageCount, message, channel3),
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+                assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(10);
+                assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(0);
+            }
+
+            private void incrementCountForConsumerAndAckMessage(AtomicInteger firstRegisteredConsumerMessageCount, Delivery message, Channel channel2) throws IOException {
+                try {
+                    firstRegisteredConsumerMessageCount.incrementAndGet();
+                    TimeUnit.SECONDS.sleep(1);
+                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
+                } catch (InterruptedException e) {
+                    //do nothing
+                }
+            }
+
+            @Test
+            void rabbitMQShouldProvideSingleActiveConsumerName() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, "foo".getBytes(StandardCharsets.UTF_8));
+
+                AtomicInteger deliveredMessagesCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+                    (consumerTag -> { }));
+                channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> deliveredMessagesCount.get() > 0);
+                awaitAtMostOneMinute.until(() -> rabbitMQExtension.managementAPI()
+                    .queueDetails("/", WORK_QUEUE)
+                    .consumerDetails.isEmpty() == false);
+
+                List<String> currentConsumerName = rabbitMQExtension.managementAPI()
+                    .queueDetails("/", WORK_QUEUE)
+                    .consumerDetails
+                    .stream()
+                    .filter(consumer -> consumer.status == RabbitMQManagementAPI.ActivityStatus.SingleActive)
+                    .map(RabbitMQManagementAPI.ConsumerDetails::getTag)
+                    .collect(Collectors.toList());
+
+                assertThat(currentConsumerName)
+                    .hasSize(1)
+                    .first()
+                    .isEqualTo(firstRegisteredConsumer);
+            }
+
+            @Test
+            void rabbitMQShouldDeliverMessageToFallbackSingleActiveConsumer() throws Exception {
+                channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+                channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+                channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+                IntStream.range(0, 10)
+                    .mapToObj(String::valueOf)
+                    .map(RabbitMQTest.this::asBytes)
+                    .forEach(Throwing.<byte[]>consumer(
+                        bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+                AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+                AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+                String firstRegisteredConsumer = "firstRegisteredConsumer";
+                ImmutableMap<String, Object> arguments = ImmutableMap.of();
+                channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> {
+                        try {
+                            if (firstRegisteredConsumerMessageCount.get() < 5) {
+                                channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE);
+                                firstRegisteredConsumerMessageCount.incrementAndGet();
+                            } else {
+                                channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE);
+                            }
+                            TimeUnit.SECONDS.sleep(1);
+                        } catch (InterruptedException e) {
+                            //do nothing
+                        }
+                    },
+                    (consumerTag -> { }));
+                channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+                    (consumerTag, message) -> {
+                        secondRegisteredConsumerMessageCount.incrementAndGet();
+                    },
+                    consumerTag -> { });
+
+                awaitAtMostOneMinute.until(() -> firstRegisteredConsumerMessageCount.get() == 5);
+
+                channel2.basicCancel(firstRegisteredConsumer);
+
+                awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+                assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(5);
+                assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(5);
+            }
         }
 
         @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org