You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/16 03:24:58 UTC
[james-project] 01/02: JAMES-3155 Limit the number of flags updated
at the same time
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e3b61d371aa5dd98ef65edab12c4390a50b79636
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 15 12:11:06 2020 +0700
JAMES-3155 Limit the number of flags updated at the same time
Large flags operations caused expensive updates to be performed "at once"
by ElasticSearch.
We should rather limit the number of flags being modified at once, and perform then "by batch".
---
.../events/ElasticSearchListeningMessageSearchIndex.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 64f4d89..2d0df24 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -77,6 +77,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
+ private static final int FLAGS_UPDATE_PROCESSING_WINDOW_SIZE = 32;
+
public static class ElasticSearchListeningMessageSearchIndexGroup extends Group {
}
@@ -203,8 +205,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
.map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailboxId, updatedFlags))
.sneakyThrow())
- .collect(toImmutableList())
- .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))
+ .window(FLAGS_UPDATE_PROCESSING_WINDOW_SIZE)
+ .concatMap(flux -> flux.collect(toImmutableList())
+ .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey)))
.then();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org