You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2017/02/07 08:06:58 UTC

[09/10] james-project git commit: JAMES-1874 Factorize more logic between CassandraMessageIdMapper and CassandraMessageMapper

JAMES-1874 Factorize more logic between CassandraMessageIdMapper and CassandraMessageMapper


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/948b58f8
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/948b58f8
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/948b58f8

Branch: refs/heads/master
Commit: 948b58f8a34345f5231d985d63c179cbf0cdc97f
Parents: 796f8eb
Author: Benoit Tellier <bt...@linagora.com>
Authored: Fri Feb 3 18:51:04 2017 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Tue Feb 7 08:57:46 2017 +0700

----------------------------------------------------------------------
 .../CassandraMailboxSessionMapperFactory.java   |   8 +-
 .../mail/CassandraIndexTableHandler.java        | 103 ++++++
 .../mail/CassandraMessageIdMapper.java          |  61 +--
 .../cassandra/mail/CassandraMessageMapper.java  |  70 +---
 .../mail/CassandraIndexTableHandlerTest.java    | 369 +++++++++++++++++++
 5 files changed, 489 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/948b58f8/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index b384297..42dd429 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -25,6 +25,7 @@ import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.mail.CassandraAnnotationMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
+import org.apache.james.mailbox.cassandra.mail.CassandraIndexTableHandler;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
@@ -62,9 +63,9 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMailboxCounterDAO mailboxCounterDAO;
     private final CassandraMailboxRecentsDAO mailboxRecentsDAO;
+    private final CassandraIndexTableHandler indexTableHandler;
     private int maxRetry;
 
-
     @Inject
     public CassandraMailboxSessionMapperFactory(UidProvider uidProvider, ModSeqProvider modSeqProvider, 
             Session session, CassandraTypesProvider typesProvider,
@@ -78,6 +79,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         this.imapUidDAO = imapUidDAO;
         this.mailboxCounterDAO = mailboxCounterDAO;
         this.mailboxRecentsDAO = mailboxRecentsDAO;
+        this.indexTableHandler = new CassandraIndexTableHandler(mailboxRecentsDAO, mailboxCounterDAO);
         this.maxRetry = DEFAULT_MAX_RETRY;
         this.typesProvider = typesProvider;
     }
@@ -89,13 +91,13 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     @Override
     public CassandraMessageMapper createMessageMapper(MailboxSession mailboxSession) {
         return new CassandraMessageMapper(uidProvider, modSeqProvider, null, maxRetry, createAttachmentMapper(mailboxSession),
-                messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO, mailboxRecentsDAO);
+                messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO, mailboxRecentsDAO, indexTableHandler);
     }
 
     @Override
     public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException {
         return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), getAttachmentMapper(mailboxSession),
-                imapUidDAO, messageIdDAO, messageDAO, mailboxCounterDAO, mailboxRecentsDAO, modSeqProvider, mailboxSession);
+                imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/948b58f8/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
new file mode 100644
index 0000000..860a632
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -0,0 +1,103 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import java.util.concurrent.CompletableFuture;
+
+import javax.inject.Inject;
+import javax.mail.Flags;
+
+import org.apache.james.mailbox.cassandra.CassandraId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+
+public class CassandraIndexTableHandler {
+
+    private final CassandraMailboxRecentsDAO mailboxRecentDAO;
+    private final CassandraMailboxCounterDAO mailboxCounterDAO;
+
+    @Inject
+    public CassandraIndexTableHandler(CassandraMailboxRecentsDAO mailboxRecentDAO,
+                                      CassandraMailboxCounterDAO mailboxCounterDAO) {
+        this.mailboxRecentDAO = mailboxRecentDAO;
+        this.mailboxCounterDAO = mailboxCounterDAO;
+    }
+
+    public CompletableFuture<Void> updateIndexOnDelete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, CassandraId mailboxId) {
+        return CompletableFuture.allOf(
+            mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()),
+            mailboxCounterDAO.decrementCount(mailboxId),
+            decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags()));
+    }
+
+    public CompletableFuture<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
+        return CompletableFuture.allOf(
+            addRecentOnSave(mailboxId, message),
+            incrementUnseenOnSave(mailboxId, message.createFlags()),
+            mailboxCounterDAO.incrementCount(mailboxId));
+    }
+
+    public CompletableFuture<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
+        return CompletableFuture.allOf(manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
+            manageRecentOnFlagsUpdate(mailboxId, updatedFlags));
+    }
+
+    private CompletableFuture<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags flags) {
+        if (flags.contains(Flags.Flag.SEEN)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return mailboxCounterDAO.decrementUnseen(mailboxId);
+    }
+
+    private CompletableFuture<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags flags) {
+        if (flags.contains(Flags.Flag.SEEN)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return mailboxCounterDAO.incrementUnseen(mailboxId);
+    }
+
+    private CompletableFuture<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage message) {
+        if (message.createFlags().contains(Flags.Flag.RECENT)) {
+            return mailboxRecentDAO.addToRecent(mailboxId, message.getUid());
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
+        if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
+            return mailboxCounterDAO.incrementUnseen(mailboxId);
+        }
+        if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
+            return mailboxCounterDAO.decrementUnseen(mailboxId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
+        if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) {
+            return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid());
+        }
+        if (updatedFlags.isModifiedToSet(Flags.Flag.RECENT)) {
+            return mailboxRecentDAO.addToRecent(mailboxId, updatedFlags.getUid());
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/948b58f8/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 41d8749..e403268 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -68,21 +68,19 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageDAO messageDAO;
-    private final CassandraMailboxCounterDAO mailboxCounterDAO;
-    private final CassandraMailboxRecentsDAO mailboxRecentsDAO;
+    private final CassandraIndexTableHandler indexTableHandler;
     private final ModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
 
     public CassandraMessageIdMapper(MailboxMapper mailboxMapper, AttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
-                                    CassandraMailboxCounterDAO cassandraMailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, ModSeqProvider modSeqProvider, MailboxSession mailboxSession) {
+                                    CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession) {
         this.mailboxMapper = mailboxMapper;
         this.attachmentMapper = attachmentMapper;
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
         this.messageDAO = messageDAO;
-        this.mailboxCounterDAO = cassandraMailboxCounterDAO;
-        this.mailboxRecentsDAO = mailboxRecentsDAO;
+        this.indexTableHandler = indexTableHandler;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
     }
@@ -150,20 +148,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .thenCompose(voidValue -> CompletableFuture.allOf(
                 imapUidDAO.insert(composedMessageIdWithMetaData),
                 messageIdDAO.insert(composedMessageIdWithMetaData)))
-            .thenCompose(voidValue -> CompletableFuture.allOf(
-                mailboxRecentsDAO.addToRecent(mailboxId, mailboxMessage.getUid()),
-                mailboxCounterDAO.incrementCount(mailboxId),
-                incrementUnseenOnSave(mailboxId, mailboxMessage.createFlags())))
+            .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
             .join();
     }
 
-    private CompletableFuture<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags flags) {
-        if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return mailboxCounterDAO.incrementUnseen(mailboxId);
-    }
-
     @Override
     public void delete(MessageId messageId, List<MailboxId> mailboxIds) {
         CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId;
@@ -197,17 +185,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         return CompletableFuture.allOf(
             imapUidDAO.delete(messageId, mailboxId),
             messageIdDAO.delete(mailboxId, metaData.getComposedMessageId().getUid()))
-            .thenCompose(voidValue -> CompletableFuture.allOf(
-                mailboxCounterDAO.decrementCount(mailboxId),
-                mailboxRecentsDAO.removeFromRecent(mailboxId, metaData.getComposedMessageId().getUid()),
-                decrementUnseenOnDelete(mailboxId, metaData.getFlags())));
-    }
-
-    private CompletableFuture<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags flags) {
-        if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return mailboxCounterDAO.decrementUnseen(mailboxId);
+            .thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(metaData, mailboxId));
     }
 
     @Override
@@ -246,37 +224,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     private CompletableFuture<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, UpdatedFlags> pair) {
         CassandraId cassandraId = (CassandraId) pair.getLeft();
-        return CompletableFuture.allOf(
-            manageRecent(pair.getRight(), cassandraId),
-            incrementCountIfNeeded(pair.getRight(), cassandraId),
-            decrementCountIfNeeded(pair.getRight(), cassandraId))
+        return indexTableHandler.updateIndexOnFlagsUpdate(cassandraId, pair.getRight())
             .thenApply(voidValue -> pair);
     }
 
-    private CompletableFuture<Void> manageRecent(UpdatedFlags updatedFlags, CassandraId cassandraId) {
-        if (updatedFlags.isSet(Flags.Flag.RECENT)) {
-            return mailboxRecentsDAO.addToRecent(cassandraId, updatedFlags.getUid());
-        }
-        if (updatedFlags.isUnset(Flags.Flag.RECENT)){
-            return mailboxRecentsDAO.removeFromRecent(cassandraId, updatedFlags.getUid());
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private CompletableFuture<Void> incrementCountIfNeeded(UpdatedFlags updatedFlags, CassandraId cassandraId) {
-        if (updatedFlags.isUnset(Flags.Flag.SEEN)) {
-            return mailboxCounterDAO.incrementUnseen(cassandraId);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private CompletableFuture<Void> decrementCountIfNeeded(UpdatedFlags updatedFlags,  CassandraId cassandraId) {
-        if (updatedFlags.isSet(Flags.Flag.SEEN)) {
-            return mailboxCounterDAO.decrementUnseen(cassandraId);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
     private Optional<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
         try {
             return updateFlags(mailboxId, messageId, newState, updateMode);

http://git-wip-us.apache.org/repos/asf/james-project/blob/948b58f8/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 38d27b8..91d600e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -76,11 +76,12 @@ public class CassandraMessageMapper implements MessageMapper {
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMailboxCounterDAO mailboxCounterDAO;
     private final CassandraMailboxRecentsDAO mailboxRecentDAO;
+    private final CassandraIndexTableHandler indexTableHandler;
 
     public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider,
                                   MailboxSession mailboxSession, int maxRetries, AttachmentMapper attachmentMapper,
                                   CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
-                                  CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO) {
+                                  CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraIndexTableHandler indexTableHandler) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
@@ -91,6 +92,7 @@ public class CassandraMessageMapper implements MessageMapper {
         this.imapUidDAO = imapUidDAO;
         this.mailboxCounterDAO = mailboxCounterDAO;
         this.mailboxRecentDAO = mailboxRecentDAO;
+        this.indexTableHandler = indexTableHandler;
     }
 
     @Override
@@ -125,27 +127,9 @@ public class CassandraMessageMapper implements MessageMapper {
         return CompletableFuture.allOf(
             imapUidDAO.delete(messageId, mailboxId),
             messageIdDAO.delete(mailboxId, uid)
-        ).thenCompose(voidValue -> CompletableFuture.allOf(
-            removeRecentOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageId.getUid()),
-            mailboxCounterDAO.decrementCount(mailboxId),
-            decrementUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags())));
+        ).thenCompose(voidValue -> indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
     }
 
-    private CompletableFuture<Void> decrementUnseenOnDelete(CassandraId mailboxId, Flags flags) {
-        if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return mailboxCounterDAO.decrementUnseen(mailboxId);
-    }
-
-    private CompletableFuture<Void> removeRecentOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) {
-        if (flags.contains(Flag.RECENT)) {
-            return mailboxRecentDAO.removeFromRecent(mailboxId, uid);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-
     private CompletableFuture<Optional<ComposedMessageIdWithMetaData>> retrieveMessageId(CassandraId mailboxId, MailboxMessage message) {
         return messageIdDAO.retrieve(mailboxId, message.getUid());
     }
@@ -231,34 +215,19 @@ public class CassandraMessageMapper implements MessageMapper {
         message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         save(mailbox, message)
-            .thenCompose(voidValue -> CompletableFuture.allOf(
-                addRecentOnSave(mailboxId, message),
-                incrementUnseenOnSave(mailboxId, message.createFlags()),
-                mailboxCounterDAO.incrementCount(mailboxId)))
+            .thenCompose(voidValue -> indexTableHandler.updateIndexOnAdd(message, mailboxId))
             .join();
         return new SimpleMessageMetaData(message);
     }
 
-    private CompletableFuture<Void> incrementUnseenOnSave(CassandraId mailboxId, Flags flags) {
-        if (flags.contains(Flags.Flag.SEEN)) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return mailboxCounterDAO.incrementUnseen(mailboxId);
-    }
 
-    private CompletableFuture<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage message) {
-        if (message.createFlags().contains(Flag.RECENT)) {
-            return mailboxRecentDAO.addToRecent(mailboxId, message.getUid());
-        }
-        return CompletableFuture.completedFuture(null);
-    }
 
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return retrieveMessages(retrieveMessageIds(mailboxId, set), FetchType.Metadata, Optional.empty())
                 .flatMap(message -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, message))
-                .map((UpdatedFlags updatedFlags) -> manageCounters(mailbox, updatedFlags)
+                .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
                     .thenApply(voidValue -> updatedFlags))
                 .map(CompletableFuture::join)
                 .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result
@@ -297,33 +266,6 @@ public class CassandraMessageMapper implements MessageMapper {
                 imapUidDAO.insert(composedMessageIdWithMetaData));
     }
 
-    private CompletableFuture<Void> manageCounters(Mailbox mailbox, UpdatedFlags updatedFlags) {
-        return CompletableFuture.allOf(manageUnseenMessageCounts(mailbox, updatedFlags),
-                manageRecents(mailbox, updatedFlags));
-    }
-
-    private CompletableFuture<Void> manageUnseenMessageCounts(Mailbox mailbox, UpdatedFlags updatedFlags) {
-        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        if (updatedFlags.isUnset(Flag.SEEN)) {
-            return mailboxCounterDAO.incrementUnseen(mailboxId);
-        }
-        if (updatedFlags.isSet(Flag.SEEN)) {
-            return mailboxCounterDAO.decrementUnseen(mailboxId);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private CompletableFuture<Void> manageRecents(Mailbox mailbox, UpdatedFlags updatedFlags) {
-        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        if (updatedFlags.isUnset(Flag.RECENT)) {
-            return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid());
-        }
-        if (updatedFlags.isSet(Flag.RECENT)) {
-            return mailboxRecentDAO.addToRecent(mailboxId, updatedFlags.getUid());
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
     private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) {
         return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message)
             .map(Stream::of)

http://git-wip-us.apache.org/repos/asf/james-project/blob/948b58f8/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
new file mode 100644
index 0000000..93d14e4
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
@@ -0,0 +1,369 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
+
+import javax.mail.Flags;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.cassandra.CassandraId;
+import org.apache.james.mailbox.cassandra.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.modules.CassandraMailboxCounterModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraMailboxRecentsModule;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CassandraIndexTableHandlerTest {
+
+    public static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+    public static final MessageUid MESSAGE_UID = MessageUid.of(18L);
+    public static final CassandraMessageId CASSANDRA_MESSAGE_ID = new CassandraMessageId.Factory().generate();
+    public static final int UID_VALIDITY = 15;
+    public static final long MODSEQ = 17;
+
+    private CassandraCluster cassandra;
+    private CassandraMailboxCounterDAO mailboxCounterDAO;
+    private CassandraMailboxRecentsDAO mailboxRecentsDAO;
+    private CassandraIndexTableHandler testee;
+    private Mailbox mailbox;
+
+    @Before
+    public void setUp() {
+        cassandra = CassandraCluster.create(
+            new CassandraModuleComposite(
+                new CassandraMailboxCounterModule(),
+                new CassandraMailboxRecentsModule()));
+        cassandra.ensureAllTables();
+
+        mailboxCounterDAO = new CassandraMailboxCounterDAO(cassandra.getConf());
+        mailboxRecentsDAO = new CassandraMailboxRecentsDAO(cassandra.getConf());
+        testee = new CassandraIndexTableHandler(mailboxRecentsDAO, mailboxCounterDAO);
+
+        mailbox = new SimpleMailbox(new MailboxPath("#private", "user", "name"),
+            UID_VALIDITY,
+            MAILBOX_ID);
+    }
+
+    @After
+    public void tearDown() {
+        cassandra.clearAllTables();
+    }
+
+    @Test
+    public void updateIndexOnAddShouldIncrementMessageCount() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnAddShouldIncrementUnseenMessageCountWhenUnseen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnAddShouldNotIncrementUnseenMessageCountWhenSeen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.SEEN));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void updateIndexOnAddShouldNotAddRecentWhenNoRecent() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void updateIndexOnAddShouldAddRecentWhenRecent() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.RECENT));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .containsOnly(MESSAGE_UID);
+    }
+
+    @Test
+    public void updateIndexOnDeleteShouldDecrementMessageCount() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
+                new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
+                new Flags(Flags.Flag.RECENT),
+                MODSEQ),
+            MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void updateIndexOnDeleteShouldDecrementUnseenMessageCountWhenUnseen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
+                new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
+                new Flags(),
+                MODSEQ),
+            MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void updateIndexOnDeleteShouldNotDecrementUnseenMessageCountWhenSeen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
+                new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
+                new Flags(Flags.Flag.SEEN),
+                MODSEQ),
+            MAILBOX_ID).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnDeleteShouldRemoveRecentWhenRecent() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.RECENT));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
+            new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
+                new Flags(Flags.Flag.RECENT),
+                MODSEQ),
+            MAILBOX_ID).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void updateIndexOnDeleteShouldRemoveUidFromRecentAnyway() throws Exception {
+        // Clean up strategy if some flags updates missed
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.RECENT));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
+                new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
+                new Flags(),
+                MODSEQ),
+            MAILBOX_ID).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldNotChangeMessageCount() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags(Flags.Flag.RECENT))
+            .oldFlags(new Flags())
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldDecrementUnseenMessageCountWhenSeenIsSet() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags(Flags.Flag.SEEN))
+            .oldFlags(new Flags())
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldIncrementUnseenMessageCountWhenSeenIsUnset() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.SEEN));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags())
+            .oldFlags(new Flags(Flags.Flag.SEEN))
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldNotChangeUnseenCountWhenBothSeen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.SEEN));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags(Flags.Flag.SEEN))
+            .oldFlags(new Flags(Flags.Flag.SEEN))
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(0);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldNotChangeUnseenCountWhenBothUnSeen() throws Exception {
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags())
+            .oldFlags(new Flags())
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        Optional<Long> actual = mailboxCounterDAO.countUnseenMessagesInMailbox(mailbox).join();
+        assertThat(actual.isPresent()).isTrue();
+        assertThat(actual.get()).isEqualTo(1);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldAddRecentOnSettingRecentFlag() throws Exception {
+        // Clean up strategy if some flags updates missed
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags());
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags(Flags.Flag.RECENT))
+            .oldFlags(new Flags())
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .containsOnly(MESSAGE_UID);
+    }
+
+    @Test
+    public void updateIndexOnFlagsUpdateShouldRemoveRecentOnUnsettingRecentFlag() throws Exception {
+        // Clean up strategy if some flags updates missed
+        MailboxMessage message = mock(MailboxMessage.class);
+        when(message.createFlags()).thenReturn(new Flags(Flags.Flag.RECENT));
+        when(message.getUid()).thenReturn(MESSAGE_UID);
+        testee.updateIndexOnAdd(message, MAILBOX_ID).join();
+
+        testee.updateIndexOnFlagsUpdate(MAILBOX_ID, UpdatedFlags.builder()
+            .uid(MESSAGE_UID)
+            .newFlags(new Flags())
+            .oldFlags(new Flags(Flags.Flag.RECENT))
+            .modSeq(MODSEQ)
+            .build()).join();
+
+        assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+            .isEmpty();
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org