You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/03 02:09:43 UTC
[james-project] branch master updated (f08f82f -> 4900a05)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.
from f08f82f JAMES-3078 MDC hierarchical MDC context for reactor
new 0afb84f JAMES-2990 Reduce consistency level: CassandraMessageFastViewProjection::store
new d58d7e9 JAMES-3058 Add a confirmation header to call SolveMailboxInconsistency task
new 8227159 JAMES-3130 Update stored state when a message is two time in the same mailbox
new 4900a05 JAMES-3130 MessageIdMapper::setFlags should return all results
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../cassandra/mail/CassandraMessageIdMapper.java | 33 +++++-----
.../inmemory/mail/InMemoryMessageIdMapper.java | 12 ++--
.../james/mailbox/store/StoreMessageIdManager.java | 13 ++--
.../james/mailbox/store/mail/MessageIdMapper.java | 12 +++-
.../store/mail/model/MessageIdMapperTest.java | 71 ++++++++++++++++------
.../CassandraMessageFastViewProjection.java | 16 ++++-
.../RabbitMQWebAdminServerIntegrationTest.java | 1 +
.../SolveMailboxInconsistenciesRequestToTask.java | 12 +++-
src/site/markdown/server/manage-webadmin.md | 3 +
9 files changed, 119 insertions(+), 54 deletions(-)
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 03/04: JAMES-3130 Update stored state when a
message is two time in the same mailbox
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8227159a964d5b621767b0b29b3aee31c77f80be
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Mar 28 15:11:52 2020 +0700
JAMES-3130 Update stored state when a message is two time in the same mailbox
If a messageId is contained 2 times in a single mailbox with 2 different
uids update will fail with a
`java.lang.IndexOutOfBoundsException: Source emitted more than one item`
error.
---
.../cassandra/mail/CassandraMessageIdMapper.java | 27 +++++++++++-----------
.../inmemory/mail/InMemoryMessageIdMapper.java | 10 +++++++-
.../store/mail/model/MessageIdMapperTest.java | 17 +++++++++++++-
3 files changed, 39 insertions(+), 15 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 750b6bd..3aeb060 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.BiFunction;
import javax.mail.Flags;
@@ -58,6 +59,7 @@ import reactor.core.scheduler.Schedulers;
public class CassandraMessageIdMapper implements MessageIdMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
+ public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a;
private final MailboxMapper mailboxMapper;
private final CassandraMailboxDAO mailboxDAO;
@@ -212,7 +214,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
.concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
.flatMap(this::updateCounts)
- .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight))
+ .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST))
.block();
}
@@ -222,15 +224,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
- try {
- return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
- .single()
- .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
- .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
- } catch (MailboxDeleteDuringUpdateException e) {
- LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
- return Mono.empty();
- }
+ return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
+ .single()
+ .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
+ .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
+ .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
+ LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
+ return Mono.empty();
+ });
}
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
@@ -261,9 +262,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
- .single()
- .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
- .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId));
+ .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
+ .next()
+ .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
}
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index c381f5b..a9dbaf5 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -24,7 +24,9 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.function.BinaryOperator;
import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.mail.Flags;
@@ -45,8 +47,10 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
public class InMemoryMessageIdMapper implements MessageIdMapper {
+ private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
private final MailboxMapper mailboxMapper;
private final InMemoryMessageMapper messageMapper;
@@ -122,7 +126,11 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
.stream()
.filter(message -> mailboxIds.contains(message.getMailboxId()))
.map(updateMessage(newState, updateMode))
- .collect(Guavate.entriesToMap());
+ .distinct()
+ .collect(Guavate.toImmutableMap(
+ Pair::getKey,
+ Pair::getValue,
+ KEEP_FIRST));
}
private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 9939095..3de1f43 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -42,6 +42,7 @@ import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.MailboxMapper;
@@ -71,7 +72,7 @@ public abstract class MessageIdMapperTest {
private MailboxMapper mailboxMapper;
private MessageIdMapper sut;
- private Mailbox benwaInboxMailbox;
+ protected Mailbox benwaInboxMailbox;
private Mailbox benwaWorkMailbox;
protected SimpleMailboxMessage message1;
@@ -928,6 +929,20 @@ public abstract class MessageIdMapperTest {
}
@Test
+ void setFlagsShouldUpdateTwoMessagesInTheSameMailboxWithTheSameMessageId() throws Exception {
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+ sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+
+ assertThat(sut.find(ImmutableList.of(message1.getMessageId()), FetchType.Metadata))
+ .extracting(MailboxMessage::createFlags)
+ .containsExactly(
+ new Flags(Flag.ANSWERED),
+ new Flags(Flag.ANSWERED));
+ }
+
+ @Test
void deletesShouldUpdateUnreadCount() throws Exception {
message1.setUid(mapperProvider.generateMessageUid());
message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 04/04: JAMES-3130 MessageIdMapper::setFlags should
return all results
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4900a05607e7005654bc1347c3e0e6c700a155a3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 31 17:00:24 2020 +0700
JAMES-3130 MessageIdMapper::setFlags should return all results
When a message (identified by its messageId) is stored several time in
the same mailbox
---
.../cassandra/mail/CassandraMessageIdMapper.java | 22 ++++-----
.../inmemory/mail/InMemoryMessageIdMapper.java | 16 ++-----
.../james/mailbox/store/StoreMessageIdManager.java | 13 +++---
.../james/mailbox/store/mail/MessageIdMapper.java | 12 ++++-
.../store/mail/model/MessageIdMapperTest.java | 54 ++++++++++++++--------
5 files changed, 67 insertions(+), 50 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3aeb060..0eeca22 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -21,9 +21,7 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.function.BiFunction;
import javax.mail.Flags;
@@ -59,7 +57,6 @@ import reactor.core.scheduler.Schedulers;
public class CassandraMessageIdMapper implements MessageIdMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
- public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a;
private final MailboxMapper mailboxMapper;
private final CassandraMailboxDAO mailboxDAO;
@@ -207,14 +204,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
@Override
- public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
return Flux.fromIterable(mailboxIds)
.distinct()
.map(mailboxId -> (CassandraId) mailboxId)
.filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
.concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
.flatMap(this::updateCounts)
- .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST))
+ .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
.block();
}
@@ -223,15 +220,16 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.hasElements();
}
- private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
.single()
.retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
- .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
.onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
return Mono.empty();
- });
+ })
+ .flatMapMany(Flux::fromIterable)
+ .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
}
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
@@ -250,7 +248,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.thenReturn(pair);
}
- private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
try {
return updateFlags(mailboxId, messageId, newState, updateMode);
} catch (MailboxException e) {
@@ -259,12 +257,12 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
}
- private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
- .next()
- .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
+ .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+ .collectList();
}
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index a9dbaf5..bc34599 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -23,10 +23,7 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.function.BinaryOperator;
import java.util.function.Function;
-import java.util.stream.Collectors;
import javax.mail.Flags;
@@ -47,11 +44,9 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
public class InMemoryMessageIdMapper implements MessageIdMapper {
- private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
-
private final MailboxMapper mailboxMapper;
private final InMemoryMessageMapper messageMapper;
@@ -120,17 +115,16 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
}
@Override
- public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
- Flags newState, FlagsUpdateMode updateMode) throws MailboxException {
+ public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
+ Flags newState, FlagsUpdateMode updateMode) {
return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
.stream()
.filter(message -> mailboxIds.contains(message.getMailboxId()))
.map(updateMessage(newState, updateMode))
.distinct()
- .collect(Guavate.toImmutableMap(
+ .collect(Guavate.toImmutableListMultimap(
Pair::getKey,
- Pair::getValue,
- KEEP_FIRST));
+ Pair::getValue));
}
private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 7f10620..7e37bf8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -74,6 +74,7 @@ import com.github.fge.lambdas.functions.ThrowingFunction;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import reactor.core.publisher.Flux;
@@ -117,9 +118,9 @@ public class StoreMessageIdManager implements MessageIdManager {
assertRightsOnMailboxes(mailboxIds, mailboxSession, Right.Write);
- Map<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
- for (Map.Entry<MailboxId, UpdatedFlags> entry : updatedFlags.entrySet()) {
- dispatchFlagsChange(mailboxSession, entry.getKey(), entry.getValue());
+ Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
+ for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) {
+ dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()));
}
}
@@ -322,15 +323,15 @@ public class StoreMessageIdManager implements MessageIdManager {
}
}
- private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, UpdatedFlags updatedFlags) throws MailboxException {
- if (updatedFlags.flagsChanged()) {
+ private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags) throws MailboxException {
+ if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) {
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
eventBus.dispatch(EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(mailboxSession)
.mailbox(mailbox)
- .updatedFlag(updatedFlags)
+ .updatedFlags(updatedFlags)
.build(),
new MailboxIdRegistrationKey(mailboxId))
.block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 2f9457f..094ce72 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.mail;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import javax.mail.Flags;
@@ -54,5 +53,14 @@ public interface MessageIdMapper {
.forEach(this::delete);
}
- Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
+ /**
+ * Updates the flags of the messages with the given MessageId in the supplied mailboxes
+ *
+ * More one message can be updated when a message is contained several time in the same mailbox with distinct
+ * MessageUid.
+ *
+ * @return Metadata of the update, indexed by mailboxIds.
+ * @throws MailboxException
+ */
+ Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 3de1f43..0c9fc61 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
public abstract class MessageIdMapperTest {
private static final Username BENWA = Username.of("benwa");
@@ -367,7 +368,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -376,7 +377,7 @@ public abstract class MessageIdMapperTest {
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap()).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -394,7 +395,7 @@ public abstract class MessageIdMapperTest {
.add("userflag")
.build();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -404,7 +405,8 @@ public abstract class MessageIdMapperTest {
.newFlags(newFlags)
.build();
- assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -422,7 +424,7 @@ public abstract class MessageIdMapperTest {
.add("userflag")
.build();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -432,7 +434,8 @@ public abstract class MessageIdMapperTest {
.newFlags(new Flags(Flags.Flag.RECENT))
.build();
- assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -466,17 +469,17 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
- assertThat(flags).isEmpty();
+ assertThat(flags.asMap()).isEmpty();
}
@Test
void setFlagsShouldReturnEmptyWhenMessageIdDoesntExist() throws Exception {
MessageId unknownMessageId = mapperProvider.generateMessageId();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
- assertThat(flags).isEmpty();
+ assertThat(flags.asMap()).isEmpty();
}
@Test
@@ -489,7 +492,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
Flags newFlags = new FlagsBuilder()
.add(Flag.RECENT)
@@ -502,7 +505,8 @@ public abstract class MessageIdMapperTest {
.oldFlags(initialFlags)
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -518,7 +522,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
ModSeq modSeqBenwaInboxMailbox = mapperProvider.highestModSeq(benwaInboxMailbox);
ModSeq modSeqBenwaWorkMailbox = mapperProvider.highestModSeq(benwaWorkMailbox);
@@ -534,8 +538,9 @@ public abstract class MessageIdMapperTest {
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags),
- MapEntry.entry(message1InOtherMailbox.getMailboxId(), expectedUpdatedFlags2));
+ assertThat(flags.asMap())
+ .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)),
+ MapEntry.entry(message1InOtherMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags2)));
}
@Test
@@ -857,19 +862,19 @@ public abstract class MessageIdMapperTest {
message1.setFlags(flags);
sut.save(message1);
- Map<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
+ Multimap<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
ImmutableList.of(message1.getMailboxId()),
flags,
FlagsUpdateMode.ADD);
- assertThat(mailboxIdUpdatedFlagsMap)
+ assertThat(mailboxIdUpdatedFlagsMap.asMap())
.containsOnly(MapEntry.entry(message1.getMailboxId(),
- UpdatedFlags.builder()
+ ImmutableList.of(UpdatedFlags.builder()
.modSeq(modSeq)
.uid(message1.getUid())
.newFlags(flags)
.oldFlags(flags)
- .build()));
+ .build())));
}
@Test
@@ -943,6 +948,17 @@ public abstract class MessageIdMapperTest {
}
@Test
+ void setFlagsShouldReturnAllUp() throws Exception {
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+ Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+
+ assertThat(map.values()).hasSize(2);
+ assertThat(map.asMap()).hasSize(1);
+ }
+
+ @Test
void deletesShouldUpdateUnreadCount() throws Exception {
message1.setUid(mapperProvider.generateMessageUid());
message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 02/04: JAMES-3058 Add a confirmation header to call
SolveMailboxInconsistency task
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d58d7e9c72511165b3974e85ecf4d3b528cef878
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 27 18:08:55 2020 +0700
JAMES-3058 Add a confirmation header to call SolveMailboxInconsistency task
---
.../rabbitmq/RabbitMQWebAdminServerIntegrationTest.java | 1 +
.../routes/SolveMailboxInconsistenciesRequestToTask.java | 12 +++++++++++-
src/site/markdown/server/manage-webadmin.md | 3 +++
3 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
index dcf26ba..0f7b1e8 100644
--- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
+++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerIntegrationTest.java
@@ -178,6 +178,7 @@ class RabbitMQWebAdminServerIntegrationTest extends WebAdminServerIntegrationTes
.body("status", is("completed"));
taskId = with()
+ .header("I-KNOW-WHAT-I-M-DOING", "ALL-SERVICES-ARE-OFFLINE")
.queryParam("task", "SolveInconsistencies")
.post("/mailboxes")
.jsonPath()
diff --git a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
index 7418226..064dca5 100644
--- a/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
+++ b/server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/SolveMailboxInconsistenciesRequestToTask.java
@@ -26,12 +26,22 @@ import org.apache.james.mailbox.cassandra.mail.task.SolveMailboxInconsistenciesT
import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
import org.apache.james.webadmin.tasks.TaskRegistrationKey;
+import com.google.common.base.Preconditions;
+
public class SolveMailboxInconsistenciesRequestToTask extends TaskFromRequestRegistry.TaskRegistration {
private static final TaskRegistrationKey REGISTRATION_KEY = TaskRegistrationKey.of("SolveInconsistencies");
@Inject
public SolveMailboxInconsistenciesRequestToTask(SolveMailboxInconsistenciesService service) {
super(REGISTRATION_KEY,
- request -> new SolveMailboxInconsistenciesTask(service));
+ request -> {
+ Preconditions.checkArgument(request.headers("I-KNOW-WHAT-I-M-DOING")
+ .equalsIgnoreCase("ALL-SERVICES-ARE-OFFLINE"),
+ "Due to concurrency risks, a `I-KNOW-WHAT-I-M-DOING` header should be positioned to " +
+ "`ALL-SERVICES-ARE-OFFLINE` in order to prevent accidental calls. " +
+ "Check the documentation for details.");
+
+ return new SolveMailboxInconsistenciesTask(service);
+ });
}
}
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index 042752b..e88532e 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -453,6 +453,9 @@ A dirty read is when data is read between the two writes of the denormalization
In order to ensure being offline, stop the traffic on SMTP, JMAP and IMAP ports, for example via re-configuration or
firewall rules.
+Due to all of those risks, a `I-KNOW-WHAT-I-M-DOING` header should be positioned to `ALL-SERVICES-ARE-OFFLINE` in order
+to prevent accidental calls.
+
#### Recomputing mailbox counters
This task is only available on top of Guice Cassandra products.
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[james-project] 01/04: JAMES-2990 Reduce consistency level:
CassandraMessageFastViewProjection::store
Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 0afb84f2e8909346e76aca6d138168d1dcb0c153
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 5 09:22:17 2020 +0700
JAMES-2990 Reduce consistency level: CassandraMessageFastViewProjection::store
Rationals: An outdated projection is very well handled on the read path so
inconsistencies are acceptable.
We have been noticing failed writes (partial quorum writes) and relying on
Cassandra consistency mechanisms for this (hinted handoff, etc...) seems like
an acceptable solution.
---
.../projections/CassandraMessageFastViewProjection.java | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
index 628a3b6..21352c8 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraMessageFastViewProjection.java
@@ -39,7 +39,10 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
@@ -50,6 +53,7 @@ import reactor.core.publisher.Mono;
public class CassandraMessageFastViewProjection implements MessageFastViewProjection {
+ public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageFastViewProjection.class);
private final Metric metricRetrieveHitCount;
private final Metric metricRetrieveMissCount;
@@ -90,7 +94,8 @@ public class CassandraMessageFastViewProjection implements MessageFastViewProjec
return cassandraAsyncExecutor.executeVoid(storeStatement.bind()
.setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
.setString(PREVIEW, precomputedProperties.getPreview().getValue())
- .setBool(HAS_ATTACHMENT, precomputedProperties.hasAttachment()));
+ .setBool(HAS_ATTACHMENT, precomputedProperties.hasAttachment())
+ .setConsistencyLevel(ConsistencyLevel.ONE));
}
@Override
@@ -98,10 +103,15 @@ public class CassandraMessageFastViewProjection implements MessageFastViewProjec
checkMessage(messageId);
return cassandraAsyncExecutor.executeSingleRow(retrieveStatement.bind()
- .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get()))
+ .setUUID(MESSAGE_ID, ((CassandraMessageId) messageId).get())
+ .setConsistencyLevel(ConsistencyLevel.ONE))
.map(this::fromRow)
.doOnNext(preview -> metricRetrieveHitCount.increment())
- .switchIfEmpty(Mono.fromRunnable(metricRetrieveMissCount::increment));
+ .switchIfEmpty(Mono.fromRunnable(metricRetrieveMissCount::increment))
+ .onErrorResume(e -> {
+ LOGGER.error("Error while retrieving MessageFastView projection item for {}", messageId, e);
+ return Mono.empty();
+ });
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org