You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/16 03:24:59 UTC

[james-project] 02/02: JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c23267ac9b8021ee000768eb309c7f64c95d7a8f
Author: duc91 <vd...@linagora.com>
AuthorDate: Tue Jul 14 15:01:16 2020 +0700

    JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling
---
 .../distributed/RabbitMQTerminationSubscriber.java |  6 +--
 .../RabbitMQTerminationSubscriberTest.java         | 54 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 2c1812f..46144d7 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -56,9 +56,9 @@ import reactor.rabbitmq.Sender;
 
 public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
-    private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
-    private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
-    private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
+    static final String EXCHANGE_NAME = "terminationSubscriberExchange";
+    static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
+    static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
 
     private final JsonEventSerializer serializer;
     private final Sender sender;
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 77cfeb7..3da35db 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -20,12 +20,19 @@
 
 package org.apache.james.task.eventsourcing.distributed;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.EXCHANGE_NAME;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.ROUTING_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.TEN_SECONDS;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.eventsourcing.Event;
@@ -36,11 +43,14 @@ import org.apache.james.json.DTOConverter;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.eventsourcing.TerminationSubscriber;
 import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
+import org.assertj.core.api.SoftAssertions;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
 
 class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
     private static final JsonTaskSerializer TASK_SERIALIZER = JsonTaskSerializer.of();
@@ -52,7 +62,8 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
 
     @Override
     public TerminationSubscriber subscriber() {
-        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(),
+            rabbitMQExtension.getReceiverProvider(), SERIALIZER);
         subscriber.start();
         return subscriber;
     }
@@ -77,4 +88,45 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
         assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
         assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
     }
+
+    @Test
+    void eventProcessingShouldNotCrashOnInvalidMessage() {
+        TerminationSubscriber subscriber1 = subscriber();
+        Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY,
+                "BAD_PAYLOAD!".getBytes(UTF_8))))
+            .block();
+
+        sendEvents(subscriber1, COMPLETED_EVENT);
+
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+
+        await().timeout(TEN_SECONDS).untilAsserted(() -> assertThat(receivedEventsFirst).hasSize(1));
+    }
+
+    @Test
+    void eventProcessingShouldNotCrashOnInvalidMessages() {
+        TerminationSubscriber subscriber1 = subscriber();
+        Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+        IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY,
+                "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+            .block());
+
+        sendEvents(subscriber1, COMPLETED_EVENT);
+
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+
+        await().atMost(ONE_MINUTE).untilAsserted(() ->
+            SoftAssertions.assertSoftly(soft -> {
+                assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+            }));
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org