You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/15 04:36:10 UTC

[james-project] 02/09: JAMES-3305 Dont crash EventBus processing upon invalid messages

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c74f8833dae8a746939ee94ec46f5d5fca7abeac
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Jul 10 11:01:02 2020 +0700

    JAMES-3305 Dont crash EventBus processing upon invalid messages
---
 .../james/mailbox/events/GroupRegistration.java    | 23 ++++++++++---
 .../james/mailbox/events/RabbitMQEventBusTest.java | 40 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 5 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index cade1c8..d95e6bd 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
@@ -35,6 +36,8 @@ import java.util.function.Predicate;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.event.json.EventSerializer;
 import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -70,6 +73,7 @@ class GroupRegistration implements Registration {
         }
     }
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
     static final String RETRY_COUNT = "retry-count";
     static final int DEFAULT_RETRY_COUNT = 0;
 
@@ -138,13 +142,22 @@ class GroupRegistration implements Registration {
 
     private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
         byte[] eventAsBytes = acknowledgableDelivery.getBody();
-        Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get();
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
-        return delayGenerator.delayIfHaveTo(currentRetryCount)
-            .flatMap(any -> runListener(event))
-            .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
-            .then(Mono.fromRunnable(acknowledgableDelivery::ack));
+        return deserializeEvent(eventAsBytes)
+            .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount)
+                .flatMap(any -> runListener(event))
+                .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
+                .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack)))
+            .onErrorResume(e -> {
+                LOGGER.error("Unable to process delivery for group {}", group, e);
+                return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE));
+            });
+    }
+
+    private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
+        return Mono.fromCallable(() -> eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get())
+            .subscribeOn(Schedulers.parallel());
     }
 
     Mono<Void> reDeliver(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 9641df3..810187c 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -53,6 +53,7 @@ import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -84,6 +85,7 @@ import com.google.common.collect.ImmutableSet;
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
@@ -172,6 +174,44 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     }
 
     @Test
+    void eventProcessingShouldNotCrashOnInvalidMessage() {
+        EventCollector listener = new EventCollector();
+        EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+        eventBus.register(listener, registeredGroup);
+
+        String emptyRoutingKey = "";
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+                emptyRoutingKey,
+                "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+            .block();
+
+        eventBus.dispatch(EVENT, NO_KEYS).block();
+        await()
+            .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() ->
+                assertThat(listener.getEvents()).containsOnly(EVENT));
+    }
+
+    @Test
+    void eventProcessingShouldNotCrashOnInvalidMessages() {
+        EventCollector listener = new EventCollector();
+        EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+        eventBus.register(listener, registeredGroup);
+
+        String emptyRoutingKey = "";
+        IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+                emptyRoutingKey,
+                "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+            .block());
+
+        eventBus.dispatch(EVENT, NO_KEYS).block();
+        await()
+            .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() ->
+            assertThat(listener.getEvents()).containsOnly(EVENT));
+    }
+
+    @Test
     void deserializeEventCollectorGroup() throws Exception {
         assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup"))
             .isEqualTo(new EventCollector.EventCollectorGroup());


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org