You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:18 UTC

[james-project] 21/29: JAMES-3075 CassandraIndexTableHandler should use Flux::mergeDelayError

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 117f5c79df61ee7023766eaec09556d649e8c310
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 27 15:19:40 2020 +0700

    JAMES-3075 CassandraIndexTableHandler should use Flux::mergeDelayError
---
 .../cassandra/mail/CassandraIndexTableHandler.java | 45 +++++++++++-----------
 .../mail/CassandraIndexTableHandlerTest.java       |  2 -
 2 files changed, 23 insertions(+), 24 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 84516fa..2232234 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.concurrent.Queues;
 
 public class CassandraIndexTableHandler {
 
@@ -57,36 +58,36 @@ public class CassandraIndexTableHandler {
     public Mono<Void> updateIndexOnDelete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, CassandraId mailboxId) {
         MessageUid uid = composedMessageIdWithMetaData.getComposedMessageId().getUid();
 
-        return Flux.merge(
-               updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()),
-               mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()),
-               mailboxCounterDAO.decrementCount(mailboxId),
-               deletedMessageDAO.removeDeleted(mailboxId, uid),
-               decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags()))
-                .then();
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()),
+                mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()),
+                mailboxCounterDAO.decrementCount(mailboxId),
+                deletedMessageDAO.removeDeleted(mailboxId, uid),
+                decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags()))
+            .then();
     }
 
     public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
         Flags flags = message.createFlags();
 
-        return Flux.merge(
-               checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()),
-               updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()),
-               addRecentOnSave(mailboxId, message),
-               incrementUnseenOnSave(mailboxId, flags),
-               mailboxCounterDAO.incrementCount(mailboxId),
-               applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(flags.getUserFlags())))
-                .then();
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()),
+                updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()),
+                addRecentOnSave(mailboxId, message),
+                incrementUnseenOnSave(mailboxId, flags),
+                mailboxCounterDAO.incrementCount(mailboxId),
+                applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(flags.getUserFlags())))
+            .then();
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        return Flux.merge(
-               manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
-               manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
-               updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
-               applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
-               updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
-                .then();
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
+                manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
+                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
+                applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
+                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
+            .then();
     }
 
     private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
index 6259819..4ba6fbb 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
@@ -50,7 +50,6 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.MessageBuilder;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -103,7 +102,6 @@ class CassandraIndexTableHandlerTest {
 
     @Nested
     class Failures {
-        @Disabled("JAMES-3075 prove that CassandraIndexTableHandler don't handle errors gracefully")
         @Test
         void messageCountShouldBeUpdatedUponDeletedMessageFailure(CassandraCluster cassandra) throws Exception {
             MailboxMessage message = new MessageBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org