You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/16 03:24:58 UTC

[james-project] 01/02: JAMES-3155 Limit the number of flags updated at the same time

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e3b61d371aa5dd98ef65edab12c4390a50b79636
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 15 12:11:06 2020 +0700

    JAMES-3155 Limit the number of flags updated at the same time
    
    Large flags operations caused expensive updates to be performed "at once"
    by ElasticSearch.
    
    We should rather limit the number of flags being modified at once, and perform then "by batch".
---
 .../events/ElasticSearchListeningMessageSearchIndex.java           | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 64f4d89..2d0df24 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -77,6 +77,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
+    private static final int FLAGS_UPDATE_PROCESSING_WINDOW_SIZE = 32;
+
     public static class ElasticSearchListeningMessageSearchIndexGroup extends Group {
 
     }
@@ -203,8 +205,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
                 updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailboxId, updatedFlags))
                 .sneakyThrow())
-            .collect(toImmutableList())
-            .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))
+            .window(FLAGS_UPDATE_PROCESSING_WINDOW_SIZE)
+            .concatMap(flux -> flux.collect(toImmutableList())
+                .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey)))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org