You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:55 UTC

[james-project] 04/04: JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b145cdc8c040280cd379fd514804fdd41ae3f918
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 13:44:33 2022 +0700

    JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement
---
 .../cassandra/mail/CassandraFirstUnseenDAO.java    | 51 ++++++++++++++++++
 .../cassandra/mail/CassandraIndexTableHandler.java | 63 +++++++++++++---------
 .../cassandra/mail/CassandraMessageMapper.java     |  4 +-
 .../mail/CassandraFirstUnseenDAOTest.java          | 19 +++----
 4 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index 6c5c2d91b9..c58499ea4d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -29,19 +29,27 @@ import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable
 import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.UID;
 
+import java.util.List;
+import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraFirstUnseenDAO {
+    private static final int BATCH_STATEMENT_WINDOW = 1024;
+    private static final int LOW_CONCURRENCY = 2;
+
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement addStatement;
     private final PreparedStatement deleteStatement;
@@ -100,12 +108,55 @@ public class CassandraFirstUnseenDAO {
                 .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> addUnread(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(
+                addStatement.bind()
+                    .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                    .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(addStatement.bind()
+                        .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) {
         return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> removeUnread(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+                        .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeAll(CassandraId cassandraId) {
         return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index b1a5f45758..b16a020e54 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -78,8 +76,7 @@ public class CassandraIndexTableHandler {
 
     public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
+                updateFirstUnseenOnDeleteWithMetadata(mailboxId, metaData),
                 updateRecentOnDeleteWithMetadata(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
                 decrementCountersOnDelete(mailboxId, metaData))
@@ -88,8 +85,7 @@ public class CassandraIndexTableHandler {
 
     public Mono<Void> updateIndexOnDeleteComposedId(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaData) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
+                updateFirstUnseenOnDelete(mailboxId, metaData),
                 updateRecentOnDeleteWithComposeId(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
             decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
@@ -142,7 +138,7 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
-    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
+    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
         ImmutableSet<String> userFlags = messages.stream()
             .flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
             .collect(ImmutableSet.toImmutableSet());
@@ -152,8 +148,7 @@ public class CassandraIndexTableHandler {
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 checkDeletedOnAdd(mailboxId, messages),
-                Flux.fromIterable(messages)
-                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+                updateFirstUnseenOnAdd(mailboxId, messages),
                 addRecentOnSave(mailboxId, messages),
                 incrementCountersOnSave(mailboxId, flags),
                 applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
@@ -161,15 +156,14 @@ public class CassandraIndexTableHandler {
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        int fairConcurrency = 4;
-        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
+        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
     }
 
-    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
                 manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
-                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
                 updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
             .then();
@@ -303,6 +297,12 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.addUnread(mailboxId, uid);
     }
 
+    private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+        return firstUnseenDAO.addUnread(mailboxId, mailboxMessages.stream().filter(mailboxMessage -> !mailboxMessage.createFlags().contains(Flags.Flag.SEEN))
+            .map(MailboxMessage::getUid)
+            .collect(Collectors.toList()));
+    }
+
     private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.DELETED)) {
             return deletedMessageDAO.addDeleted(mailboxId, uid);
@@ -324,19 +324,32 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
-        return Flux.fromIterable(updatedFlags)
-            .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
-            .then();
+    private Mono<Void> updateFirstUnseenOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+        return firstUnseenDAO.removeUnread(mailboxId, metaDatas.stream().filter(metaData -> !metaData.getFlags().contains(Flags.Flag.SEEN))
+            .map(MessageMetaData::getUid)
+            .collect(Collectors.toList()));
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
-            return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
-        }
-        if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
-            return firstUnseenDAO.removeUnread(mailboxId, updatedFlags.getUid());
-        }
-        return Mono.empty();
+    private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> composedMessageIdWithMetaData) {
+        return firstUnseenDAO.removeUnread(mailboxId, composedMessageIdWithMetaData.stream().filter(composeId -> !composeId.getFlags().contains(Flags.Flag.SEEN))
+            .map(composeId -> composeId.getComposedMessageId().getUid())
+            .collect(Collectors.toList()));
+    }
+
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        ImmutableList.Builder<MessageUid> addUnreadUidsBuilder = ImmutableList.builder();
+        ImmutableList.Builder<MessageUid> removeUnreadUidsBuilder = ImmutableList.builder();
+        updatedFlags.forEach(flag -> {
+                if (flag.isModifiedToUnset(Flags.Flag.SEEN)) {
+                    addUnreadUidsBuilder.add(flag.getUid());
+                } else if (flag.isModifiedToSet(Flags.Flag.SEEN)) {
+                    removeUnreadUidsBuilder.add(flag.getUid());
+                }
+            });
+
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                firstUnseenDAO.addUnread(mailboxId, addUnreadUidsBuilder.build()),
+                firstUnseenDAO.removeUnread(mailboxId, removeUnreadUidsBuilder.build()))
+            .then();
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 06fcdf1994..54b9c18bed 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -521,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
+        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
             .onErrorResume(e -> {
                 LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
                 return Mono.empty();
@@ -644,7 +644,7 @@ public class CassandraMessageMapper implements MessageMapper {
             .concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
             .flatMap(id -> messageIdDAO.insert(id)
                 .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
-            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
+            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
     }
 
     private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
index a3a31a07d4..9d7aec0236 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
+import java.util.List;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.mailbox.MessageUid;
@@ -72,8 +74,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeAllShouldDeleteAllUidEntries() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeAll(MAILBOX_ID).block();
 
@@ -88,9 +89,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void retrieveFirstUnreadShouldReturnLowestUnreadUid() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
             .isEqualByComparingTo(UID_1);
@@ -114,9 +113,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void retrieveFirstUnreadShouldBeOrderIndependent() {
-        testee.addUnread(MAILBOX_ID, UID_2).block();
-
-        testee.addUnread(MAILBOX_ID, UID_1).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
             .isEqualByComparingTo(UID_1);
@@ -152,8 +149,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeUnreadShouldRemoveLastUnread() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeUnread(MAILBOX_ID, UID_2).block();
 
@@ -163,8 +159,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeUnreadShouldHaveNoEffectWhenNotLast() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeUnread(MAILBOX_ID, UID_1).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org