You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:20 UTC

[james-project] branch master updated (ccc5e48 -> aa6ac0b)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from ccc5e48  JAMES-3265 Leverage default method to offer a conviant single message flag update
     new ea6e884  JAMES-3290 Mails failing to load should not be lost by dequeue
     new 74896fd  JAMES-3138 Cassandra testing instrumentation should allow injecting empty results
     new b3ecc46  JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.
     new b6d0467  JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash
     new fd57eb7  JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message is in an invalid format.
     new 9f0a943  JAMES-3290 Many invalid JSON should not abort dequeue
     new aa6ac0b  JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/backends/cassandra/Scenario.java  |  11 ++
 .../backends/cassandra/TestingSessionTest.java     |  12 ++
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  32 +++-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |   2 +
 .../apache/james/queue/rabbitmq/MailQueueName.java |  10 ++
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |  13 ++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 189 +++++++++++++++++++++
 7 files changed, 261 insertions(+), 8 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/07: JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa6ac0bf7a5a2403b9ef28a8db50639129ae0007
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 16:55:55 2020 +0700

    JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages
    
    This prevents loosing dropped messages
---
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  2 ++
 .../apache/james/queue/rabbitmq/MailQueueName.java | 10 ++++++++
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 13 ++++++++++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 30 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 481e380..c134798 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -38,6 +38,7 @@ import org.apache.mailet.Mail;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 
 import reactor.core.publisher.Mono;
@@ -88,6 +89,7 @@ class Enqueuer {
             .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
             .priority(PERSISTENT_TEXT_PLAIN.getPriority())
             .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .headers(ImmutableMap.of("x-dead-letter-routing-key", EMPTY_ROUTING_KEY))
             .build();
 
         OutboundMessage data = new OutboundMessage(
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
index dfc352d..f70151e 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
@@ -112,6 +112,8 @@ public final class MailQueueName {
 
     private static final String PREFIX = "JamesMailQueue";
     private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-";
+    private static final String DEAD_LETTER_EXCHANGE_PREFIX = PREFIX + "-dead-letter-exchange-";
+    private static final String DEAD_LETTER_QUEUE_PREFIX = PREFIX + "-dead-letter-queue-";
     @VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + "-workqueue-";
 
     public static MailQueueName fromString(String name) {
@@ -134,6 +136,14 @@ public final class MailQueueName {
         return name;
     }
 
+    String toDeadLetterExchangeName() {
+        return DEAD_LETTER_EXCHANGE_PREFIX + name;
+    }
+
+    String toDeadLetterQueueName() {
+        return DEAD_LETTER_QUEUE_PREFIX + name;
+    }
+
     ExchangeName toRabbitExchangeName() {
         return new ExchangeName(name);
     }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index e9b3c32..4dd0181 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -46,6 +46,7 @@ import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
@@ -156,14 +157,26 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
                 .durable(true)
                 .type("direct")),
+            sender.declareExchange(ExchangeSpecification.exchange(mailQueueName.toDeadLetterExchangeName())
+                .durable(true)
+                .type("direct")),
             sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
+                .arguments(ImmutableMap.of("x-dead-letter-exchange", mailQueueName.toDeadLetterExchangeName()))),
+            sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS)),
             sender.bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
+                .routingKey(EMPTY_ROUTING_KEY)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(mailQueueName.toDeadLetterExchangeName())
+                .queue(mailQueueName.toDeadLetterQueueName())
                 .routingKey(EMPTY_ROUTING_KEY)))
             .then()
             .block();
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 0289266..bf2d209 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -35,6 +35,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -327,7 +328,7 @@ class RabbitMQMailQueueTest {
                 .times(1)
                 .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
 
-            List<MailQueue.MailQueueItem> items =  Flux.from(getMailQueue().deQueue())
+            List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
                 .take(3)
                 .collectList()
                 .block(Duration.ofSeconds(10));
@@ -457,6 +458,33 @@ class RabbitMQMailQueueTest {
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
+
+        @Test
+        void rejectedMessagesShouldBeDeadLettered() {
+            String emptyRoutingKey = "";
+            rabbitMQExtension.getSender()
+                .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+                    emptyRoutingKey,
+                    "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+                .block();
+
+            AtomicInteger deadLetteredCount = new AtomicInteger();
+            rabbitMQExtension.getRabbitChannelPool()
+                .createReceiver()
+                .consumeAutoAck("JamesMailQueue-dead-letter-queue-spool")
+                .doOnNext(next -> deadLetteredCount.incrementAndGet())
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            Flux.from(getMailQueue().deQueue())
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+
+            Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+                .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/07: JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b6d0467bcf1642f1fec8f799615099e9a7e36eb2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 16:29:06 2020 +0700

    JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash
---
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index ee32d8a..4b2bd9b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -30,6 +30,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
@@ -63,6 +64,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import org.apache.james.util.streams.Iterators;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.apache.mailet.Mail;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -76,6 +78,7 @@ import com.github.fge.lambdas.Throwing;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.OutboundMessage;
 
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -378,6 +381,44 @@ class RabbitMQMailQueueTest {
             // We expect content missing blob references to be purged from the queue
             assertThat(dequeuedNames).isEmpty();
         }
+
+        @Disabled("JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash")
+        @Test
+        void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            String emptyRoutingKey = "";
+            rabbitMQExtension.getSender()
+                .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+                    emptyRoutingKey,
+                    "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+                .block();
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
+
+            Flux.from(getMailQueue().deQueue())
+                .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribe();
+
+            Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+                .untilAsserted(() -> assertThat(dequeuedMailNames)
+                    .containsExactly(name1, name2, name3));
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/07: JAMES-3138 Cassandra testing instrumentation should allow injecting empty results

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 74896fd4e3d4c0392aa1cb13869070ec0397bf25
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 16:55:32 2020 +0700

    JAMES-3138 Cassandra testing instrumentation should allow injecting empty results
---
 .../java/org/apache/james/backends/cassandra/Scenario.java   | 11 +++++++++++
 .../apache/james/backends/cassandra/TestingSessionTest.java  | 12 ++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
index f5ddbf7..74c4aa8 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/Scenario.java
@@ -54,6 +54,10 @@ public class Scenario {
 
         Behavior EXECUTE_NORMALLY = Session::executeAsync;
 
+        // Hack. We rely on version key unicity (because UUID) to create an empty ResultSet
+        Behavior RETURN_EMPTY = (session, statement) -> session.executeAsync(
+            "SELECT value FROM schemaVersion WHERE key=49128560-bb80-11ea-bad6-e3b96c9cd431;");
+
         static Behavior awaitOn(Barrier barrier, Behavior behavior) {
             return (session, statement) -> {
                 barrier.call();
@@ -163,6 +167,13 @@ public class Scenario {
                 validity);
         }
 
+        static RequiresValidity returnEmpty() {
+            return validity -> statementPredicate -> new ExecutionHook(
+                statementPredicate,
+                Behavior.RETURN_EMPTY,
+                validity);
+        }
+
         static RequiresValidity executeNormally() {
             return validity -> statementPredicate -> new ExecutionHook(
                 statementPredicate,
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
index 6f9987b..e6cd607 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -23,6 +23,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn;
 import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
 import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.TABLE_NAME;
 import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.VALUE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -76,6 +77,17 @@ class TestingSessionTest {
     }
 
     @Test
+    void daoOperationShouldNotBeInstrumentedWhenReturnEmpty(CassandraCluster cassandra) {
+        cassandra.getConf()
+            .registerScenario(returnEmpty()
+                .times(1)
+                .whenQueryStartsWith("SELECT value FROM schemaVersion;"));
+
+        assertThat(dao.getCurrentSchemaVersion().block())
+            .isEmpty();
+    }
+
+    @Test
     void recordStatementsShouldKeepTraceOfExecutedStatement(CassandraCluster cassandra) {
         StatementRecorder statementRecorder = new StatementRecorder();
         cassandra.getConf().recordStatements(statementRecorder);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/07: JAMES-3290 Mails failing to load should not be lost by dequeue

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ea6e8843783d7f43b4badc84860b6c3deadbc62e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 15:45:05 2020 +0700

    JAMES-3290 Mails failing to load should not be lost by dequeue
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 12 ++++++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 33 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b13b613..27fe748 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -33,6 +33,8 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
@@ -46,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
 
 class Dequeuer implements Closeable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class);
     private static final boolean REQUEUE = true;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -136,9 +139,14 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
         return toMailReference(response)
-            .flatMap(mailLoader::load);
+            .flatMap(reference -> mailLoader.load(reference)
+                .onErrorResume(e -> {
+                    LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
+                    response.nack(REQUEUE);
+                    return Mono.empty();
+                }));
     }
 
     private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index d34a953..6bf316b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.queue.rabbitmq;
 
 import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -296,6 +297,38 @@ class RabbitMQMailQueueTest {
                 }))
                 .blockLast();
         }
+
+        @Test
+        void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            cassandra.getConf().registerScenario(fail()
+                .times(1)
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            List<MailQueue.MailQueueItem> items =  Flux.from(getMailQueue().deQueue())
+                .take(3)
+                .collectList()
+                .block(Duration.ofSeconds(10));
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .containsOnly(name1, name2, name3);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/07: JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message is in an invalid format.

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fd57eb747a7d2216d6be7771a77026305c360abb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 10:01:01 2020 +0700

    JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message is in an invalid format.
    
    The relevant mail data is still stored in the MailQueue browse projection, preventing data loss.
---
 .../src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java     | 6 ++++--
 .../java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java | 1 -
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 09a3df6..bacd1ff 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq;
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.function.Consumer;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
@@ -158,7 +157,10 @@ class Dequeuer implements Closeable {
     private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
         return Mono.fromCallable(getResponse::getBody)
             .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
-            .onErrorResume(IOException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse DTO", e)));
+            .onErrorResume(e -> {
+                LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e);
+                return Mono.empty();
+            });
     }
 
 }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 4b2bd9b..abae166 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -382,7 +382,6 @@ class RabbitMQMailQueueTest {
             assertThat(dequeuedNames).isEmpty();
         }
 
-        @Disabled("JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash")
         @Test
         void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception {
             String name1 = "myMail1";


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/07: JAMES-3290 Many invalid JSON should not abort dequeue

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9f0a943fd2d07ae9d0b26a35ce1293cd0f325efc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 3 09:31:13 2020 +0700

    JAMES-3290 Many invalid JSON should not abort dequeue
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 14 ++++----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 39 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index bacd1ff..fe209f9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.rabbitmq.client.Delivery;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -139,26 +138,27 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
-        return toMailReference(response)
+    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery delivery) {
+        return toMailReference(delivery)
             .flatMap(reference -> mailLoader.load(reference)
                 .onErrorResume(ObjectNotFoundException.class, e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
-                    response.nack(!REQUEUE);
+                    delivery.nack(!REQUEUE);
                     return Mono.empty();
                 })
                 .onErrorResume(e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
-                    response.nack(REQUEUE);
+                    delivery.nack(REQUEUE);
                     return Mono.empty();
                 }));
     }
 
-    private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
-        return Mono.fromCallable(getResponse::getBody)
+    private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery delivery) {
+        return Mono.fromCallable(delivery::getBody)
             .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
             .onErrorResume(e -> {
                 LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e);
+                delivery.nack(!REQUEUE);
                 return Mono.empty();
             });
     }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index abae166..0289266 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -418,6 +418,45 @@ class RabbitMQMailQueueTest {
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
+
+        @Test
+        void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            String emptyRoutingKey = "";
+
+            IntStream.range(0, 100)
+                .forEach(i -> rabbitMQExtension.getSender()
+                    .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+                        emptyRoutingKey,
+                        ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8))))
+                    .block());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
+
+            Flux.from(getMailQueue().deQueue())
+                .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribe();
+
+            Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+                .untilAsserted(() -> assertThat(dequeuedMailNames)
+                    .containsExactly(name1, name2, name3));
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/07: JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b3ecc4645581be9625ea7f4912886f9075459074
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 17:08:22 2020 +0700

    JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  6 +++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 27fe748..09a3df6 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.function.Consumer;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
@@ -142,6 +143,11 @@ class Dequeuer implements Closeable {
     private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
         return toMailReference(response)
             .flatMap(reference -> mailLoader.load(reference)
+                .onErrorResume(ObjectNotFoundException.class, e -> {
+                    LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
+                    response.nack(!REQUEUE);
+                    return Mono.empty();
+                })
                 .onErrorResume(e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
                     response.nack(REQUEUE);
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 6bf316b..ee32d8a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,7 +20,9 @@
 package org.apache.james.queue.rabbitmq;
 
 import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -72,6 +75,7 @@ import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -329,6 +333,51 @@ class RabbitMQMailQueueTest {
                 .extracting(item -> item.getMail().getName())
                 .containsOnly(name1, name2, name3);
         }
+
+        @Test
+        void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            cassandra.getConf().registerScenario(returnEmpty()
+                .forever()
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            ConcurrentLinkedDeque<String> dequeuedNames = new ConcurrentLinkedDeque<>();
+            Flux.from(getMailQueue().deQueue())
+                .take(3)
+                .doOnNext(item -> dequeuedNames.add(item.getMail().getName()))
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            // One second should be enough to attempt dequeues while we fail to load blobs
+            Thread.sleep(1000);
+
+            // Restore normal behaviour
+            cassandra.getConf().registerScenario(executeNormally()
+                .forever()
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            // Let one second to check if the queue is empty
+            Thread.sleep(1000);
+
+            // We expect content missing blob references to be purged from the queue
+            assertThat(dequeuedNames).isEmpty();
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org