You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/16 03:24:57 UTC

[james-project] branch master updated (c4fafaf -> c23267a)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from c4fafaf  JAMES-3266 Move disable ElasticSearch option into a separate search.properties configuration file
     new e3b61d3  JAMES-3155 Limit the number of flags updated at the same time
     new c23267a  JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ElasticSearchListeningMessageSearchIndex.java  |  7 ++-
 .../distributed/RabbitMQTerminationSubscriber.java |  6 +--
 .../RabbitMQTerminationSubscriberTest.java         | 54 +++++++++++++++++++++-
 3 files changed, 61 insertions(+), 6 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/02: JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c23267ac9b8021ee000768eb309c7f64c95d7a8f
Author: duc91 <vd...@linagora.com>
AuthorDate: Tue Jul 14 15:01:16 2020 +0700

    JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling
---
 .../distributed/RabbitMQTerminationSubscriber.java |  6 +--
 .../RabbitMQTerminationSubscriberTest.java         | 54 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 2c1812f..46144d7 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -56,9 +56,9 @@ import reactor.rabbitmq.Sender;
 
 public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class);
-    private static final String EXCHANGE_NAME = "terminationSubscriberExchange";
-    private static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
-    private static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
+    static final String EXCHANGE_NAME = "terminationSubscriberExchange";
+    static final String QUEUE_NAME_PREFIX = "terminationSubscriber";
+    static final String ROUTING_KEY = "terminationSubscriberRoutingKey";
 
     private final JsonEventSerializer serializer;
     private final Sender sender;
diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
index 77cfeb7..3da35db 100644
--- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
+++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java
@@ -20,12 +20,19 @@
 
 package org.apache.james.task.eventsourcing.distributed;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.EXCHANGE_NAME;
+import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.ROUTING_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.TEN_SECONDS;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.eventsourcing.Event;
@@ -36,11 +43,14 @@ import org.apache.james.json.DTOConverter;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.task.eventsourcing.TerminationSubscriber;
 import org.apache.james.task.eventsourcing.TerminationSubscriberContract;
+import org.assertj.core.api.SoftAssertions;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.OutboundMessage;
 
 class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract {
     private static final JsonTaskSerializer TASK_SERIALIZER = JsonTaskSerializer.of();
@@ -52,7 +62,8 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
 
     @Override
     public TerminationSubscriber subscriber() {
-        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER);
+        RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(),
+            rabbitMQExtension.getReceiverProvider(), SERIALIZER);
         subscriber.start();
         return subscriber;
     }
@@ -77,4 +88,45 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract
         assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
         assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT);
     }
+
+    @Test
+    void eventProcessingShouldNotCrashOnInvalidMessage() {
+        TerminationSubscriber subscriber1 = subscriber();
+        Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY,
+                "BAD_PAYLOAD!".getBytes(UTF_8))))
+            .block();
+
+        sendEvents(subscriber1, COMPLETED_EVENT);
+
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+
+        await().timeout(TEN_SECONDS).untilAsserted(() -> assertThat(receivedEventsFirst).hasSize(1));
+    }
+
+    @Test
+    void eventProcessingShouldNotCrashOnInvalidMessages() {
+        TerminationSubscriber subscriber1 = subscriber();
+        Flux<Event> firstListener = Flux.from(subscriber1.listenEvents());
+
+        IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME,
+                ROUTING_KEY,
+                "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+            .block());
+
+        sendEvents(subscriber1, COMPLETED_EVENT);
+
+        List<Event> receivedEventsFirst = new ArrayList<>();
+        firstListener.subscribe(receivedEventsFirst::add);
+
+        await().atMost(ONE_MINUTE).untilAsserted(() ->
+            SoftAssertions.assertSoftly(soft -> {
+                assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT);
+            }));
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/02: JAMES-3155 Limit the number of flags updated at the same time

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e3b61d371aa5dd98ef65edab12c4390a50b79636
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 15 12:11:06 2020 +0700

    JAMES-3155 Limit the number of flags updated at the same time
    
    Large flags operations caused expensive updates to be performed "at once"
    by ElasticSearch.
    
    We should rather limit the number of flags being modified at once, and perform then "by batch".
---
 .../events/ElasticSearchListeningMessageSearchIndex.java           | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 64f4d89..2d0df24 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -77,6 +77,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
+    private static final int FLAGS_UPDATE_PROCESSING_WINDOW_SIZE = 32;
+
     public static class ElasticSearchListeningMessageSearchIndexGroup extends Group {
 
     }
@@ -203,8 +205,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
                 updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailboxId, updatedFlags))
                 .sneakyThrow())
-            .collect(toImmutableList())
-            .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))
+            .window(FLAGS_UPDATE_PROCESSING_WINDOW_SIZE)
+            .concatMap(flux -> flux.collect(toImmutableList())
+                .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey)))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org