You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/15 04:36:10 UTC
[james-project] 02/09: JAMES-3305 Dont crash EventBus processing
upon invalid messages
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c74f8833dae8a746939ee94ec46f5d5fca7abeac
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Fri Jul 10 11:01:02 2020 +0700
JAMES-3305 Dont crash EventBus processing upon invalid messages
---
.../james/mailbox/events/GroupRegistration.java | 23 ++++++++++---
.../james/mailbox/events/RabbitMQEventBusTest.java | 40 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 5 deletions(-)
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index cade1c8..d95e6bd 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
@@ -35,6 +36,8 @@ import java.util.function.Predicate;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.event.json.EventSerializer;
import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
@@ -70,6 +73,7 @@ class GroupRegistration implements Registration {
}
}
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class);
static final String RETRY_COUNT = "retry-count";
static final int DEFAULT_RETRY_COUNT = 0;
@@ -138,13 +142,22 @@ class GroupRegistration implements Registration {
private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
byte[] eventAsBytes = acknowledgableDelivery.getBody();
- Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get();
int currentRetryCount = getRetryCount(acknowledgableDelivery);
- return delayGenerator.delayIfHaveTo(currentRetryCount)
- .flatMap(any -> runListener(event))
- .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
- .then(Mono.fromRunnable(acknowledgableDelivery::ack));
+ return deserializeEvent(eventAsBytes)
+ .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount)
+ .flatMap(any -> runListener(event))
+ .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
+ .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack)))
+ .onErrorResume(e -> {
+ LOGGER.error("Unable to process delivery for group {}", group, e);
+ return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE));
+ });
+ }
+
+ private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
+ return Mono.fromCallable(() -> eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get())
+ .subscribeOn(Schedulers.parallel());
}
Mono<Void> reDeliver(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 9641df3..810187c 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -53,6 +53,7 @@ import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -84,6 +85,7 @@ import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
@@ -172,6 +174,44 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
}
@Test
+ void eventProcessingShouldNotCrashOnInvalidMessage() {
+ EventCollector listener = new EventCollector();
+ EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+ eventBus.register(listener, registeredGroup);
+
+ String emptyRoutingKey = "";
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ emptyRoutingKey,
+ "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+ .block();
+
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+ await()
+ .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() ->
+ assertThat(listener.getEvents()).containsOnly(EVENT));
+ }
+
+ @Test
+ void eventProcessingShouldNotCrashOnInvalidMessages() {
+ EventCollector listener = new EventCollector();
+ EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA();
+ eventBus.register(listener, registeredGroup);
+
+ String emptyRoutingKey = "";
+ IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+ emptyRoutingKey,
+ "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+ .block());
+
+ eventBus.dispatch(EVENT, NO_KEYS).block();
+ await()
+ .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() ->
+ assertThat(listener.getEvents()).containsOnly(EVENT));
+ }
+
+ @Test
void deserializeEventCollectorGroup() throws Exception {
assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup"))
.isEqualTo(new EventCollector.EventCollectorGroup());
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org