You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/30 08:27:15 UTC
[james-project] 03/08: JAMES-3319 Mitigate CassandraMailQueueView
deletion concurrency
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2676b9c80085e0a5af98df0cedc399daf99054e1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 28 15:11:07 2020 +0700
JAMES-3319 Mitigate CassandraMailQueueView deletion concurrency
---
.../james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 4a9bad8..754294f 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -73,6 +73,8 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
}
}
+ private static final int DELETION_CONCURRENCY = 8;
+
private final CassandraMailQueueMailStore storeHelper;
private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
@@ -144,7 +146,7 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
.map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
.filter(deleteCondition::shouldBeDeleted)
.flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)
- .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))))
+ .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), DELETION_CONCURRENCY)
.count()
.doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
.subscribeOn(Schedulers.elastic())
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org