You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/17 02:21:49 UTC

[james-project] 06/11: [REFACTORING] Cassandra implementation should depend on interfaces for UidProvider and ModSeqProvider

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dac815063fb735c69d2b22e39979ba420eb52cea
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 13 15:59:47 2021 +0700

    [REFACTORING] Cassandra implementation should depend on interfaces for UidProvider and ModSeqProvider
    
    This enables people to inject their own generation logic, that might rely
    on stronger mechanisms than LWTs.
    
    Candidates might be Atomix, Zookeeper, Consul, Etcd, etc...
---
 .../CassandraMailboxSessionMapperFactory.java         |  7 +++----
 .../cassandra/mail/CassandraMessageIdMapper.java      |  7 ++++---
 .../cassandra/mail/CassandraMessageMapper.java        | 16 +++++++++-------
 .../cassandra/mail/CassandraModSeqProvider.java       | 19 +++++++++----------
 .../mailbox/cassandra/mail/CassandraUidProvider.java  | 11 ++++++++---
 .../cassandra/mail/CassandraModSeqProviderTest.java   |  4 ++--
 .../mailbox/cassandra/mail/utils/GuiceUtils.java      |  3 +++
 .../james/mailbox/store/mail/ModSeqProvider.java      |  6 ++++++
 .../apache/james/mailbox/store/mail/UidProvider.java  | 16 ++++++++++++++++
 9 files changed, 60 insertions(+), 29 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 671b894..4194924 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -50,7 +50,6 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
-import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
 import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
 import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper;
@@ -73,8 +72,8 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     protected static final String ATTACHMENTMAPPER = "ATTACHMENTMAPPER";
 
     private final Session session;
-    private final CassandraUidProvider uidProvider;
-    private final CassandraModSeqProvider modSeqProvider;
+    private final UidProvider uidProvider;
+    private final ModSeqProvider modSeqProvider;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -101,7 +100,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
-    public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
+    public CassandraMailboxSessionMapperFactory(UidProvider uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
                                                 CassandraMessageDAO messageDAO,
                                                 CassandraMessageDAOV3 messageDAOV3, CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
                                                 CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, CassandraMailboxDAO mailboxDAO,
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 9ac0b67..84407d8 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -49,6 +49,7 @@ import org.apache.james.mailbox.store.MailboxReactorUtils;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.ReactorUtils;
@@ -79,14 +80,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraIndexTableHandler indexTableHandler;
-    private final CassandraModSeqProvider modSeqProvider;
+    private final ModSeqProvider modSeqProvider;
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
 
     public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO,
                                     CassandraMessageDAO messageDAO, CassandraMessageDAOV3 messageDAOV3, CassandraIndexTableHandler indexTableHandler,
-                                    CassandraModSeqProvider modSeqProvider, CassandraConfiguration cassandraConfiguration) {
+                                    ModSeqProvider modSeqProvider, CassandraConfiguration cassandraConfiguration) {
 
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
@@ -308,7 +309,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         if (identicalFlags(oldComposedId, newFlags)) {
             return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
         } else {
-            return modSeqProvider.nextModSeq(cassandraId)
+            return modSeqProvider.nextModSeqReactive(cassandraId)
                 .map(modSeq -> new ComposedMessageIdWithMetaData(
                     oldComposedId.getComposedMessageId(),
                     newFlags,
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index f929b83..2a1f903 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -58,6 +58,8 @@ import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.FlagsUpdateCalculator;
 import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
+import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.task.Task;
@@ -82,8 +84,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
     private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);
 
-    private final CassandraModSeqProvider modSeqProvider;
-    private final CassandraUidProvider uidProvider;
+    private final ModSeqProvider modSeqProvider;
+    private final UidProvider uidProvider;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -99,7 +101,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private final RecomputeMailboxCountersService recomputeMailboxCountersService;
     private final SecureRandom secureRandom;
 
-    public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
+    public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider,
                                   CassandraAttachmentMapper attachmentMapper,
                                   CassandraMessageDAO messageDAO, CassandraMessageDAOV3 messageDAOV3, CassandraMessageIdDAO messageIdDAO,
                                   CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
@@ -343,10 +345,10 @@ public class CassandraMessageMapper implements MessageMapper {
 
     private Mono<MailboxMessage> addUidAndModseq(MailboxMessage message, CassandraId mailboxId) {
         Mono<MessageUid> messageUidMono = uidProvider
-            .nextUids(mailboxId)
+            .nextUidReactive(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
 
-        Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
+        Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeqReactive(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
 
         return Mono.zip(messageUidMono, nextModSeqMono)
@@ -438,7 +440,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<ModSeq> computeNewModSeq(CassandraId mailboxId) {
-        return modSeqProvider.nextModSeq(mailboxId)
+        return modSeqProvider.nextModSeqReactive(mailboxId)
             .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new RuntimeException("ModSeq generation failed for mailbox " + mailboxId.asUuid())));
     }
 
@@ -492,7 +494,7 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         Mono<List<MessageUid>> uids = uidProvider.nextUids(mailboxId, messages.size());
-        Mono<ModSeq> nextModSeq = modSeqProvider.nextModSeq(mailboxId);
+        Mono<ModSeq> nextModSeq = modSeqProvider.nextModSeqReactive(mailboxId);
 
         Mono<List<MailboxMessage>> messagesWithUidAndModSeq = nextModSeq.flatMap(modSeq -> uids.map(uidList -> Pair.of(uidList, modSeq)))
             .map(pair -> pair.getKey().stream()
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index e41df05..65896d2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -121,19 +121,16 @@ public class CassandraModSeqProvider implements ModSeqProvider {
             .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
-
-
     @Override
     public ModSeq nextModSeq(Mailbox mailbox) throws MailboxException {
-        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return nextModSeq(mailboxId)
+        return nextModSeqReactive(mailbox.getMailboxId())
             .blockOptional()
-            .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
+            .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailbox.getMailboxId()));
     }
 
     @Override
     public ModSeq nextModSeq(MailboxId mailboxId) throws MailboxException {
-        return nextModSeq((CassandraId) mailboxId)
+        return nextModSeqReactive(mailboxId)
             .blockOptional()
             .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
     }
@@ -184,13 +181,15 @@ public class CassandraModSeqProvider implements ModSeqProvider {
         return Optional.empty();
     }
 
-    public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
+    @Override
+    public Mono<ModSeq> nextModSeqReactive(MailboxId mailboxId) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
         Duration firstBackoff = Duration.ofMillis(10);
 
-        return findHighestModSeq(mailboxId)
+        return findHighestModSeq(cassandraId)
             .flatMap(maybeHighestModSeq -> maybeHighestModSeq
-                        .map(highestModSeq -> tryUpdateModSeq(mailboxId, highestModSeq))
-                        .orElseGet(() -> tryInsertModSeq(mailboxId, ModSeq.first())))
+                        .map(highestModSeq -> tryUpdateModSeq(cassandraId, highestModSeq))
+                        .orElseGet(() -> tryInsertModSeq(cassandraId, ModSeq.first())))
             .single()
             .retryWhen(Retry.backoff(maxModSeqRetries, firstBackoff).scheduler(Schedulers.elastic()));
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index afc7111..877c15c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -105,12 +105,14 @@ public class CassandraUidProvider implements UidProvider {
     @Override
     public MessageUid nextUid(MailboxId mailboxId) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return nextUids(cassandraId)
+        return nextUidReactive(cassandraId)
             .blockOptional()
             .orElseThrow(() -> new MailboxException("Error during Uid update"));
     }
 
-    public Mono<MessageUid> nextUids(CassandraId cassandraId) {
+    @Override
+    public Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
         Mono<MessageUid> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
 
@@ -122,7 +124,10 @@ public class CassandraUidProvider implements UidProvider {
             .retryWhen(Retry.backoff(maxUidRetries, firstBackoff).scheduler(Schedulers.elastic()));
     }
 
-    public Mono<List<MessageUid>> nextUids(CassandraId cassandraId, int count) {
+    @Override
+    public Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
+
         Mono<List<MessageUid>> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid, count)
                 .map(highest -> range(messageUid, highest)));
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index 8773221..8ad2b1e 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -115,10 +115,10 @@ class CassandraModSeqProviderTest {
                     .times(1)
                     .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE mailboxId=:mailboxId;"));
 
-        CompletableFuture<ModSeq> operation1 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation1 = modSeqProvider.nextModSeqReactive(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
             .toFuture();
-        CompletableFuture<ModSeq> operation2 = modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation2 = modSeqProvider.nextModSeqReactive(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
             .toFuture();
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
index 1c6481a..c1c9ba1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
@@ -38,8 +38,10 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.json.DTO;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.eventsourcing.acl.ACLModule;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.UidProvider;
 
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableSet;
@@ -73,6 +75,7 @@ public class GuiceUtils {
                                         CassandraConfiguration configuration) {
         return Modules.combine(
             binder -> binder.bind(MessageId.Factory.class).toInstance(messageIdFactory),
+            binder -> binder.bind(UidProvider.class).to(CassandraUidProvider.class),
             binder -> binder.bind(BlobId.Factory.class).toInstance(new HashBlobId.Factory()),
             binder -> binder.bind(BlobStore.class).toProvider(() -> CassandraBlobStoreFactory.forTesting(session).passthrough()),
             binder -> binder.bind(Session.class).toInstance(session),
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
index 7c96c74..a363401 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
@@ -23,6 +23,8 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Take care of provide mod-seqences for a given {@link Mailbox}. Be aware that implementations
  * need to be thread-safe!
@@ -56,4 +58,8 @@ public interface ModSeqProvider {
      * Return the highest mod-sequence which were used for the {@link Mailbox}
      */
     ModSeq highestModSeq(MailboxId mailboxId) throws MailboxException;
+
+    default Mono<ModSeq> nextModSeqReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> nextModSeq(mailboxId));
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
index 57c3f8c..d4e6b38 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail;
 
+import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.mailbox.MessageUid;
@@ -25,6 +26,11 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 /**
  * Take care of provide uids for a given {@link Mailbox}. Be aware that implementations
  * need to be thread-safe!
@@ -44,4 +50,14 @@ public interface UidProvider {
     Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException;
 
     MessageUid nextUid(MailboxId mailboxId) throws MailboxException;
+
+    default Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> nextUid(mailboxId));
+    }
+
+    default Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
+        return Flux.range(0, count)
+            .flatMap(i -> nextUidReactive(mailboxId))
+            .collect(Guavate.toImmutableList());
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org