You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/16 03:24:59 UTC
[james-project] 02/02: JAMES-3308: add test in
RabbitMQTerminationSubscriberTest for deserialization error handling
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c23267ac9b8021ee000768eb309c7f64c95d7a8f
Author: duc91 <vd...@linagora.com>
AuthorDate: Tue Jul 14 15:01:16 2020 +0700
JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling
---
.../distributed/RabbitMQTerminationSubscriber.java | 6 +--
.../RabbitMQTerminationSubscriberTest.java | 54 +++++++++++++++++++++-
2 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 2c1812f..46144d7 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -56,9 +56,9 @@ import reactor.rabbitmq.Sender;
public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
- private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
- private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
- private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
+ static final String EXCHANGE_NAME = "terminationSubscriberExchange";
+ static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
+ static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
private final JsonEventSerializer serializer;
private final Sender sender;
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 77cfeb7..3da35db 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -20,12 +20,19 @@
package org.apache.james.task.eventsourcing.distributed;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.EXCHANGE_NAME;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.ROUTING_KEY;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.TEN_SECONDS;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.stream.IntStream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.eventsourcing.Event;
@@ -36,11 +43,14 @@ import org.apache.james.json.DTOConverter;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.task.eventsourcing.TerminationSubscriber;
import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
+import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
private static final JsonTaskSerializer TASK_SERIALIZER = JsonTaskSerializer.of();
@@ -52,7 +62,8 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
@Override
public TerminationSubscriber subscriber() {
- RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+ RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(),
+ rabbitMQExtension.getReceiverProvider(), SERIALIZER);
subscriber.start();
return subscriber;
}
@@ -77,4 +88,45 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
}
+
+ @Test
+ void eventProcessingShouldNotCrashOnInvalidMessage() {
+ TerminationSubscriber subscriber1 = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+ ROUTING_KEY,
+ "BAD_PAYLOAD!".getBytes(UTF_8))))
+ .block();
+
+ sendEvents(subscriber1, COMPLETED_EVENT);
+
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+
+ await().timeout(TEN_SECONDS).untilAsserted(() -> assertThat(receivedEventsFirst).hasSize(1));
+ }
+
+ @Test
+ void eventProcessingShouldNotCrashOnInvalidMessages() {
+ TerminationSubscriber subscriber1 = subscriber();
+ Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+ IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+ ROUTING_KEY,
+ "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+ .block());
+
+ sendEvents(subscriber1, COMPLETED_EVENT);
+
+ List<Event> receivedEventsFirst = new ArrayList<>();
+ firstListener.subscribe(receivedEventsFirst::add);
+
+ await().atMost(ONE_MINUTE).untilAsserted(() ->
+ SoftAssertions.assertSoftly(soft -> {
+ assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+ }));
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org