You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/11/26 15:31:39 UTC
[james-project] 04/07: JAMES-2813 add test to demonstrate the use
of single active consumer with rabbitmq 3.8
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 93496e1d98915c1073d3767f3300aeddcbdf5754
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Mon Oct 21 10:17:43 2019 +0200
JAMES-2813 add test to demonstrate the use of single active consumer with rabbitmq 3.8
---
.../james/backends/rabbitmq/RabbitMQTest.java | 149 +++++++++++++++++++--
1 file changed, 141 insertions(+), 8 deletions(-)
diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
index b673a10..a41a903 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQTest.java
@@ -37,10 +37,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterEach;
@@ -59,6 +62,7 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
class RabbitMQTest {
@@ -200,8 +204,8 @@ class RabbitMQTest {
IntStream.range(0, 10)
.mapToObj(String::valueOf)
.map(RabbitMQTest.this::asBytes)
- .forEach(Throwing.consumer(
- bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
awaitAtMostOneMinute.until(
() -> countReceivedMessages(consumer2, consumer3, consumer4) == 30);
@@ -233,8 +237,8 @@ class RabbitMQTest {
IntStream.range(0, nbMessages)
.mapToObj(String::valueOf)
.map(RabbitMQTest.this::asBytes)
- .forEach(Throwing.consumer(
- bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
InMemoryConsumer consumer2 = new InMemoryConsumer(channel2);
InMemoryConsumer consumer3 = new InMemoryConsumer(channel3);
@@ -265,8 +269,8 @@ class RabbitMQTest {
IntStream.range(0, 10)
.mapToObj(String::valueOf)
.map(RabbitMQTest.this::asBytes)
- .forEach(Throwing.consumer(
- bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
ConcurrentLinkedQueue<Integer> receivedMessages = new ConcurrentLinkedQueue<>();
String dyingConsumerTag = "dyingConsumer";
@@ -297,8 +301,8 @@ class RabbitMQTest {
IntStream.range(0, 10)
.mapToObj(String::valueOf)
.map(RabbitMQTest.this::asBytes)
- .forEach(Throwing.consumer(
- bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)));
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
String dyingConsumerTag = "dyingConsumer";
ImmutableMap<String, Object> arguments = ImmutableMap.of();
@@ -327,6 +331,135 @@ class RabbitMQTest {
assertThat(fallbackConsumer.getConsumedMessages()).contains(1, 2).doesNotContain(0);
}
+ @Test
+ void rabbitMQShouldDeliverMessageToSingleActiveConsumer() throws Exception {
+ channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+ channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ IntStream.range(0, 10)
+ .mapToObj(String::valueOf)
+ .map(RabbitMQTest.this::asBytes)
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+ channel2.basicQos(1);
+ channel3.basicQos(1);
+
+ AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+ AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+ String firstRegisteredConsumer = "firstRegisteredConsumer";
+ ImmutableMap<String, Object> arguments = ImmutableMap.of();
+ channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> incrementCountForConsumerAndAckMessage(firstRegisteredConsumerMessageCount, message, channel2),
+ (consumerTag -> {
+ }));
+ channel3.basicConsume(WORK_QUEUE, !AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> incrementCountForConsumerAndAckMessage(secondRegisteredConsumerMessageCount, message, channel3),
+ consumerTag -> { });
+
+ awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+ assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(10);
+ assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(0);
+ }
+
+ private void incrementCountForConsumerAndAckMessage(AtomicInteger firstRegisteredConsumerMessageCount, Delivery message, Channel channel2) throws IOException {
+ try {
+ firstRegisteredConsumerMessageCount.incrementAndGet();
+ TimeUnit.SECONDS.sleep(1);
+ channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
+ } catch (InterruptedException e) {
+ //do nothing
+ }
+ }
+
+ @Test
+ void rabbitMQShouldProvideSingleActiveConsumerName() throws Exception {
+ channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+ channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, "foo".getBytes(StandardCharsets.UTF_8));
+
+ AtomicInteger deliveredMessagesCount = new AtomicInteger(0);
+
+ String firstRegisteredConsumer = "firstRegisteredConsumer";
+ ImmutableMap<String, Object> arguments = ImmutableMap.of();
+ channel2.basicConsume(WORK_QUEUE, AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+ (consumerTag -> { }));
+ channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "starvingConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> deliveredMessagesCount.incrementAndGet(),
+ consumerTag -> { });
+
+ awaitAtMostOneMinute.until(() -> deliveredMessagesCount.get() > 0);
+ awaitAtMostOneMinute.until(() -> rabbitMQExtension.managementAPI()
+ .queueDetails("/", WORK_QUEUE)
+ .consumerDetails.isEmpty() == false);
+
+ List<String> currentConsumerName = rabbitMQExtension.managementAPI()
+ .queueDetails("/", WORK_QUEUE)
+ .consumerDetails
+ .stream()
+ .filter(consumer -> consumer.status == RabbitMQManagementAPI.ActivityStatus.SingleActive)
+ .map(RabbitMQManagementAPI.ConsumerDetails::getTag)
+ .collect(Collectors.toList());
+
+ assertThat(currentConsumerName)
+ .hasSize(1)
+ .first()
+ .isEqualTo(firstRegisteredConsumer);
+ }
+
+ @Test
+ void rabbitMQShouldDeliverMessageToFallbackSingleActiveConsumer() throws Exception {
+ channel1.exchangeDeclare(EXCHANGE_NAME, "direct", DURABLE);
+ channel1.queueDeclare(WORK_QUEUE, DURABLE, !EXCLUSIVE, !AUTO_DELETE, Constants.WITH_SINGLE_ACTIVE_CONSUMER);
+ channel1.queueBind(WORK_QUEUE, EXCHANGE_NAME, ROUTING_KEY);
+
+ IntStream.range(0, 10)
+ .mapToObj(String::valueOf)
+ .map(RabbitMQTest.this::asBytes)
+ .forEach(Throwing.<byte[]>consumer(
+ bytes -> channel1.basicPublish(EXCHANGE_NAME, ROUTING_KEY, NO_PROPERTIES, bytes)).sneakyThrow());
+
+ AtomicInteger firstRegisteredConsumerMessageCount = new AtomicInteger(0);
+ AtomicInteger secondRegisteredConsumerMessageCount = new AtomicInteger(0);
+
+ String firstRegisteredConsumer = "firstRegisteredConsumer";
+ ImmutableMap<String, Object> arguments = ImmutableMap.of();
+ channel2.basicConsume(WORK_QUEUE, !AUTO_ACK, firstRegisteredConsumer, !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> {
+ try {
+ if (firstRegisteredConsumerMessageCount.get() < 5) {
+ channel2.basicAck(message.getEnvelope().getDeliveryTag(), !MULTIPLE);
+ firstRegisteredConsumerMessageCount.incrementAndGet();
+ } else {
+ channel2.basicNack(message.getEnvelope().getDeliveryTag(), !MULTIPLE, REQUEUE);
+ }
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ //do nothing
+ }
+ },
+ (consumerTag -> { }));
+ channel3.basicConsume(WORK_QUEUE, AUTO_ACK, "fallbackConsumer", !NO_LOCAL, !EXCLUSIVE, arguments,
+ (consumerTag, message) -> {
+ secondRegisteredConsumerMessageCount.incrementAndGet();
+ },
+ consumerTag -> { });
+
+ awaitAtMostOneMinute.until(() -> firstRegisteredConsumerMessageCount.get() == 5);
+
+ channel2.basicCancel(firstRegisteredConsumer);
+
+ awaitAtMostOneMinute.until(() -> (firstRegisteredConsumerMessageCount.get() + secondRegisteredConsumerMessageCount.get()) == 10);
+
+ assertThat(firstRegisteredConsumerMessageCount.get()).isEqualTo(5);
+ assertThat(secondRegisteredConsumerMessageCount.get()).isEqualTo(5);
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org