You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/03 02:09:43 UTC

[james-project] branch master updated (f08f82f -> 4900a05)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from f08f82f  JAMES-3078 MDC hierarchical MDC context for reactor
     new 0afb84f  JAMES-2990 Reduce consistency level: CassandraMessageFastViewProjection::store
     new d58d7e9  JAMES-3058 Add a confirmation header to call SolveMailboxInconsistency task
     new 8227159  JAMES-3130 Update stored state when a message is two time in the same mailbox
     new 4900a05  JAMES-3130 MessageIdMapper::setFlags should return all results

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/mail/CassandraMessageIdMapper.java   | 33 +++++-----
 .../inmemory/mail/InMemoryMessageIdMapper.java     | 12 ++--
 .../james/mailbox/store/StoreMessageIdManager.java | 13 ++--
 .../james/mailbox/store/mail/MessageIdMapper.java  | 12 +++-
 .../store/mail/model/MessageIdMapperTest.java      | 71 ++++++++++++++++------
 .../CassandraMessageFastViewProjection.java        | 16 ++++-
 .../RabbitMQWebAdminServerIntegrationTest.java     |  1 +
 .../SolveMailboxInconsistenciesRequestToTask.java  | 12 +++-
 src/site/markdown/server/manage-webadmin.md        |  3 +
 9 files changed, 119 insertions(+), 54 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/04: JAMES-3130 Update stored state when a message is two time in the same mailbox

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8227159a964d5b621767b0b29b3aee31c77f80be
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 28 15:11:52 2020 +0700

    JAMES-3130 Update stored state when a message is two time in the same mailbox
    
    If a messageId is contained 2 times in a single mailbox with 2 different
    uids update will fail with a
    `java.lang.IndexOutOfBoundsException: Source emitted more than one item`
    error.
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 27 +++++++++++-----------
 .../inmemory/mail/InMemoryMessageIdMapper.java     | 10 +++++++-
 .../store/mail/model/MessageIdMapperTest.java      | 17 +++++++++++++-
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 750b6bd..3aeb060 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiFunction;
 
 import javax.mail.Flags;
 
@@ -58,6 +59,7 @@ import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
+    public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a;
 
     private final MailboxMapper mailboxMapper;
     private final CassandraMailboxDAO mailboxDAO;
@@ -212,7 +214,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
             .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
             .flatMap(this::updateCounts)
-            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight))
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST))
             .block();
     }
 
@@ -222,15 +224,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     }
 
     private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
-        try {
-            return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
-                .single()
-                .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
-                .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
-        } catch (MailboxDeleteDuringUpdateException e) {
-            LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
-            return Mono.empty();
-        }
+        return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
+            .single()
+            .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
+            .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
+            .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
+                LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
+                return Mono.empty();
+            });
     }
 
     private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
@@ -261,9 +262,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
-            .single()
-            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
-            .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId));
+            .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
+            .next()
+            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
     }
 
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index c381f5b..a9dbaf5 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -24,7 +24,9 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BinaryOperator;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import javax.mail.Flags;
 
@@ -45,8 +47,10 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class InMemoryMessageIdMapper implements MessageIdMapper {
+    private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
 
     private final MailboxMapper mailboxMapper;
     private final InMemoryMessageMapper messageMapper;
@@ -122,7 +126,11 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
             .stream()
             .filter(message -> mailboxIds.contains(message.getMailboxId()))
             .map(updateMessage(newState, updateMode))
-            .collect(Guavate.entriesToMap());
+            .distinct()
+            .collect(Guavate.toImmutableMap(
+                Pair::getKey,
+                Pair::getValue,
+                KEEP_FIRST));
     }
 
     private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 9939095..3de1f43 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -42,6 +42,7 @@ import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
@@ -71,7 +72,7 @@ public abstract class MessageIdMapperTest {
     private MailboxMapper mailboxMapper;
     private MessageIdMapper sut;
 
-    private Mailbox benwaInboxMailbox;
+    protected Mailbox benwaInboxMailbox;
     private Mailbox benwaWorkMailbox;
     
     protected SimpleMailboxMessage message1;
@@ -928,6 +929,20 @@ public abstract class MessageIdMapperTest {
     }
 
     @Test
+    void setFlagsShouldUpdateTwoMessagesInTheSameMailboxWithTheSameMessageId() throws Exception {
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+        sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+
+        assertThat(sut.find(ImmutableList.of(message1.getMessageId()), FetchType.Metadata))
+            .extracting(MailboxMessage::createFlags)
+            .containsExactly(
+                new Flags(Flag.ANSWERED),
+                new Flags(Flag.ANSWERED));
+    }
+
+    @Test
     void deletesShouldUpdateUnreadCount() throws Exception {
         message1.setUid(mapperProvider.generateMessageUid());
         message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/04: JAMES-3130 MessageIdMapper::setFlags should return all results

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4900a05607e7005654bc1347c3e0e6c700a155a3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 31 17:00:24 2020 +0700

    JAMES-3130 MessageIdMapper::setFlags should return all results
    
    When a message (identified by its messageId) is stored several time in
    the same mailbox
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 22 ++++-----
 .../inmemory/mail/InMemoryMessageIdMapper.java     | 16 ++-----
 .../james/mailbox/store/StoreMessageIdManager.java | 13 +++---
 .../james/mailbox/store/mail/MessageIdMapper.java  | 12 ++++-
 .../store/mail/model/MessageIdMapperTest.java      | 54 ++++++++++++++--------
 5 files changed, 67 insertions(+), 50 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3aeb060..0eeca22 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -21,9 +21,7 @@ package org.apache.james.mailbox.cassandra.mail;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.function.BiFunction;
 
 import javax.mail.Flags;
 
@@ -59,7 +57,6 @@ import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
-    public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a;
 
     private final MailboxMapper mailboxMapper;
     private final CassandraMailboxDAO mailboxDAO;
@@ -207,14 +204,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     }
 
     @Override
-    public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+    public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         return Flux.fromIterable(mailboxIds)
             .distinct()
             .map(mailboxId -> (CassandraId) mailboxId)
             .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
             .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
             .flatMap(this::updateCounts)
-            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST))
+            .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
             .block();
     }
 
@@ -223,15 +220,16 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .hasElements();
     }
 
-    private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+    private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
         return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
             .single()
             .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
-            .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
             .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
                 LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
                 return Mono.empty();
-            });
+            })
+            .flatMapMany(Flux::fromIterable)
+            .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
     }
 
     private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
@@ -250,7 +248,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .thenReturn(pair);
     }
 
-    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
         try {
             return updateFlags(mailboxId, messageId, newState, updateMode);
         } catch (MailboxException e) {
@@ -259,12 +257,12 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         }
     }
 
-    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
             .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
-            .next()
-            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
+            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+            .collectList();
     }
 
     private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index a9dbaf5..bc34599 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -23,10 +23,7 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.function.BinaryOperator;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import javax.mail.Flags;
 
@@ -47,11 +44,9 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
 
 public class InMemoryMessageIdMapper implements MessageIdMapper {
-    private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
-
     private final MailboxMapper mailboxMapper;
     private final InMemoryMessageMapper messageMapper;
 
@@ -120,17 +115,16 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
     }
 
     @Override
-    public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
-                                                 Flags newState, FlagsUpdateMode updateMode) throws MailboxException {
+    public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
+                                                      Flags newState, FlagsUpdateMode updateMode) {
         return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
             .stream()
             .filter(message -> mailboxIds.contains(message.getMailboxId()))
             .map(updateMessage(newState, updateMode))
             .distinct()
-            .collect(Guavate.toImmutableMap(
+            .collect(Guavate.toImmutableListMultimap(
                 Pair::getKey,
-                Pair::getValue,
-                KEEP_FIRST));
+                Pair::getValue));
     }
 
     private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 7f10620..7e37bf8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -74,6 +74,7 @@ import com.github.fge.lambdas.functions.ThrowingFunction;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import reactor.core.publisher.Flux;
@@ -117,9 +118,9 @@ public class StoreMessageIdManager implements MessageIdManager {
 
         assertRightsOnMailboxes(mailboxIds, mailboxSession, Right.Write);
 
-        Map<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
-        for (Map.Entry<MailboxId, UpdatedFlags> entry : updatedFlags.entrySet()) {
-            dispatchFlagsChange(mailboxSession, entry.getKey(), entry.getValue());
+        Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
+        for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) {
+            dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()));
         }
     }
 
@@ -322,15 +323,15 @@ public class StoreMessageIdManager implements MessageIdManager {
         }
     }
     
-    private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, UpdatedFlags updatedFlags) throws MailboxException {
-        if (updatedFlags.flagsChanged()) {
+    private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags) throws MailboxException {
+        if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) {
             Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
 
             eventBus.dispatch(EventFactory.flagsUpdated()
                 .randomEventId()
                 .mailboxSession(mailboxSession)
                 .mailbox(mailbox)
-                .updatedFlag(updatedFlags)
+                .updatedFlags(updatedFlags)
                 .build(),
                 new MailboxIdRegistrationKey(mailboxId))
                 .block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 2f9457f..094ce72 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.mail;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import javax.mail.Flags;
 
@@ -54,5 +53,14 @@ public interface MessageIdMapper {
             .forEach(this::delete);
     }
 
-    Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
+    /**
+     * Updates the flags of the messages with the given MessageId in the supplied mailboxes
+     *
+     * More one message can be updated when a message is contained several time in the same mailbox with distinct
+     * MessageUid.
+     *
+     * @return Metadata of the update, indexed by mailboxIds.
+     * @throws MailboxException
+     */
+    Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 3de1f43..0c9fc61 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
 
 public abstract class MessageIdMapperTest {
     private static final Username BENWA = Username.of("benwa");
@@ -367,7 +368,7 @@ public abstract class MessageIdMapperTest {
 
         MessageId messageId = message1.getMessageId();
         Flags newFlags = new Flags(Flag.ANSWERED);
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
 
         ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
         UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -376,7 +377,7 @@ public abstract class MessageIdMapperTest {
             .oldFlags(new Flags())
             .newFlags(newFlags)
             .build();
-        assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+        assertThat(flags.asMap()).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
     }
 
     @Test
@@ -394,7 +395,7 @@ public abstract class MessageIdMapperTest {
             .add("userflag")
             .build();
 
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
 
         ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
         UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -404,7 +405,8 @@ public abstract class MessageIdMapperTest {
             .newFlags(newFlags)
             .build();
 
-        assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+        assertThat(flags.asMap())
+            .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
     }
 
     @Test
@@ -422,7 +424,7 @@ public abstract class MessageIdMapperTest {
             .add("userflag")
             .build();
 
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
 
         ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
         UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -432,7 +434,8 @@ public abstract class MessageIdMapperTest {
             .newFlags(new Flags(Flags.Flag.RECENT))
             .build();
 
-        assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+        assertThat(flags.asMap())
+            .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
     }
 
     @Test
@@ -466,17 +469,17 @@ public abstract class MessageIdMapperTest {
 
         MessageId messageId = message1.getMessageId();
         Flags newFlags = new Flags(Flag.ANSWERED);
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
 
-        assertThat(flags).isEmpty();
+        assertThat(flags.asMap()).isEmpty();
     }
 
     @Test
     void setFlagsShouldReturnEmptyWhenMessageIdDoesntExist() throws Exception {
         MessageId unknownMessageId = mapperProvider.generateMessageId();
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
 
-        assertThat(flags).isEmpty();
+        assertThat(flags.asMap()).isEmpty();
     }
 
     @Test
@@ -489,7 +492,7 @@ public abstract class MessageIdMapperTest {
 
         MessageId messageId = message1.getMessageId();
 
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
 
         Flags newFlags = new FlagsBuilder()
             .add(Flag.RECENT)
@@ -502,7 +505,8 @@ public abstract class MessageIdMapperTest {
             .oldFlags(initialFlags)
             .newFlags(newFlags)
             .build();
-        assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+        assertThat(flags.asMap())
+            .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
     }
 
     @Test
@@ -518,7 +522,7 @@ public abstract class MessageIdMapperTest {
 
         MessageId messageId = message1.getMessageId();
         Flags newFlags = new Flags(Flag.ANSWERED);
-        Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+        Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
 
         ModSeq modSeqBenwaInboxMailbox = mapperProvider.highestModSeq(benwaInboxMailbox);
         ModSeq modSeqBenwaWorkMailbox = mapperProvider.highestModSeq(benwaWorkMailbox);
@@ -534,8 +538,9 @@ public abstract class MessageIdMapperTest {
             .oldFlags(new Flags())
             .newFlags(newFlags)
             .build();
-        assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags),
-                MapEntry.entry(message1InOtherMailbox.getMailboxId(), expectedUpdatedFlags2));
+        assertThat(flags.asMap())
+            .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)),
+                MapEntry.entry(message1InOtherMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags2)));
     }
 
     @Test
@@ -857,19 +862,19 @@ public abstract class MessageIdMapperTest {
         message1.setFlags(flags);
         sut.save(message1);
 
-        Map<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
+        Multimap<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
             ImmutableList.of(message1.getMailboxId()),
             flags,
             FlagsUpdateMode.ADD);
 
-        assertThat(mailboxIdUpdatedFlagsMap)
+        assertThat(mailboxIdUpdatedFlagsMap.asMap())
             .containsOnly(MapEntry.entry(message1.getMailboxId(),
-                UpdatedFlags.builder()
+                ImmutableList.of(UpdatedFlags.builder()
                     .modSeq(modSeq)
                     .uid(message1.getUid())
                     .newFlags(flags)
                     .oldFlags(flags)
-                    .build()));
+                    .build())));
     }
 
     @Test
@@ -943,6 +948,17 @@ public abstract class MessageIdMapperTest {
     }
 
     @Test
+    void setFlagsShouldReturnAllUp() throws Exception {
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+        addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+        Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+
+        assertThat(map.values()).hasSize(2);
+        assertThat(map.asMap()).hasSize(1);
+    }
+
+    @Test
     void deletesShouldUpdateUnreadCount() throws Exception {
         message1.setUid(mapperProvider.generateMessageUid());
         message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/04: JAMES-3058 Add a confirmation header to call SolveMailboxInconsistency task

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d58d7e9c72511165b3974e85ecf4d3b528cef878
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 27 18:08:55 2020 +0700

    JAMES-3058 Add a confirmation header to call SolveMailboxInconsistency task
---
 .../rabbitmq/RabbitMQWebAdminServerIntegrationTest.java      |  1 +
 .../routes/SolveMailboxInconsistenciesRequestToTask.java     | 12 +++++++++++-
 src/site/markdown/server/manage-webadmin.md                  |  3 +++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
index dcf26ba..0f7b1e8 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
@@ -178,6 +178,7 @@ class RabbitMQWebAdminServerIntegrationTest extends WebAdminServerIntegrationTes
             .body("status", is("completed"));
 
         taskId = with()
+            .header("I-KNOW-WHAT-I-M-DOING", "ALL-SERVICES-ARE-OFFLINE")
             .queryParam("task", "SolveInconsistencies")
         .post("/mailboxes")
             .jsonPath()
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
index 7418226..064dca5 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
@@ -26,12 +26,22 @@ import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesT
 import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
 import org.apache.james.webadmin.tasks.TaskRegistrationKey;
 
+import com.google.common.base.Preconditions;
+
 public class SolveMailboxInconsistenciesRequestToTask extends TaskFromRequestRegistry.TaskRegistration {
     private static final TaskRegistrationKey REGISTRATION_KEY = TaskRegistrationKey.of("SolveInconsistencies");
 
     @Inject
     public SolveMailboxInconsistenciesRequestToTask(SolveMailboxInconsistenciesService service) {
         super(REGISTRATION_KEY,
-            request -> new SolveMailboxInconsistenciesTask(service));
+            request -> {
+                Preconditions.checkArgument(request.headers("I-KNOW-WHAT-I-M-DOING")
+                        .equalsIgnoreCase("ALL-SERVICES-ARE-OFFLINE"),
+                    "Due to concurrency risks, a `I-KNOW-WHAT-I-M-DOING` header should be positioned to " +
+                        "`ALL-SERVICES-ARE-OFFLINE` in order to prevent accidental calls. " +
+                        "Check the documentation for details.");
+
+                return new SolveMailboxInconsistenciesTask(service);
+            });
     }
 }
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 042752b..e88532e 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -453,6 +453,9 @@ A dirty read is when data is read between the two writes of the denormalization
 In order to ensure being offline, stop the traffic on SMTP, JMAP and IMAP ports, for example via re-configuration or 
 firewall rules.
 
+Due to all of those risks, a `I-KNOW-WHAT-I-M-DOING` header should be positioned to `ALL-SERVICES-ARE-OFFLINE` in order 
+to prevent accidental calls.
+
 #### Recomputing mailbox counters
 
 This task is only available on top of Guice Cassandra products.


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/04: JAMES-2990 Reduce consistency level: CassandraMessageFastViewProjection::store

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0afb84f2e8909346e76aca6d138168d1dcb0c153
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 5 09:22:17 2020 +0700

    JAMES-2990 Reduce consistency level: CassandraMessageFastViewProjection::store
    
    Rationals: An outdated projection is very well handled on the read path so
    inconsistencies are acceptable.
    
    We have been noticing failed writes (partial quorum writes) and relying on
    Cassandra consistency mechanisms for this (hinted handoff, etc...) seems like
    an acceptable solution.
---
 .../projections/CassandraMessageFastViewProjection.java  | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
index 628a3b6..21352c8 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
@@ -39,7 +39,10 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
@@ -50,6 +53,7 @@ import reactor.core.publisher.Mono;
 
 public class CassandraMessageFastViewProjection implements MessageFastViewProjection {
 
+    public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageFastViewProjection.class);
     private final Metric metricRetrieveHitCount;
     private final Metric metricRetrieveMissCount;
 
@@ -90,7 +94,8 @@ public class CassandraMessageFastViewProjection implements MessageFastViewProjec
         return cassandraAsyncExecutor.executeVoid(storeStatement.bind()
             .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
             .setString(PREVIEW, precomputedProperties.getPreview().getValue())
-            .setBool(HAS_ATTACHMENT, precomputedProperties.hasAttachment()));
+            .setBool(HAS_ATTACHMENT, precomputedProperties.hasAttachment())
+            .setConsistencyLevel(ConsistencyLevel.ONE));
     }
 
     @Override
@@ -98,10 +103,15 @@ public class CassandraMessageFastViewProjection implements MessageFastViewProjec
         checkMessage(messageId);
 
         return cassandraAsyncExecutor.executeSingleRow(retrieveStatement.bind()
-                .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()))
+                .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
+                .setConsistencyLevel(ConsistencyLevel.ONE))
             .map(this::fromRow)
             .doOnNext(preview -> metricRetrieveHitCount.increment())
-            .switchIfEmpty(Mono.fromRunnable(metricRetrieveMissCount::increment));
+            .switchIfEmpty(Mono.fromRunnable(metricRetrieveMissCount::increment))
+            .onErrorResume(e -> {
+                LOGGER.error("Error while retrieving MessageFastView projection item for {}", messageId, e);
+                return Mono.empty();
+            });
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org