You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:53 UTC

[james-project] 02/04: JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 31b9d6e746de34ce3365790d5dc3c3680ead9717
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 12:18:03 2022 +0700

    JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement
---
 .../cassandra/mail/CassandraDeletedMessageDAO.java | 50 +++++++++++++++++++
 .../cassandra/mail/CassandraIndexTableHandler.java | 57 ++++++++++++++--------
 .../mail/CassandraDeletedMessageDAOTest.java       | 16 ++----
 3 files changed, 91 insertions(+), 32 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 69e301b6b4..01abf65271 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,6 +30,9 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
 
+import java.util.List;
+import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,9 +40,11 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MessageRange;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -47,6 +52,8 @@ import reactor.core.publisher.Mono;
 public class CassandraDeletedMessageDAO {
     private static final String UID_TO = "uid_to";
     private static final String UID_FROM = "uid_from";
+    private static final int BATCH_STATEMENT_WINDOW = 1024;
+    private static final int LOW_CONCURRENCY = 2;
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement addStatement;
@@ -123,12 +130,55 @@ public class CassandraDeletedMessageDAO {
                 .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> addDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(
+                addStatement.bind()
+                    .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                    .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(addStatement.bind()
+                        .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) {
         return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> removeDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+                .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+                        .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeAll(CassandraId cassandraId) {
         return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 5e2cd75fca..27682cc714 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -81,8 +82,7 @@ public class CassandraIndexTableHandler {
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
                 decrementCountersOnDelete(mailboxId, metaData))
             .then();
     }
@@ -93,8 +93,7 @@ public class CassandraIndexTableHandler {
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateRecentOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
             decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
                 .map(ComposedMessageIdWithMetaData::getFlags)
                 .collect(ImmutableList.toImmutableList())))
@@ -117,6 +116,18 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
+    private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaDatas) {
+        return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(composedId -> composedId.getFlags().contains(Flags.Flag.DELETED))
+            .map(composedId -> composedId.getComposedMessageId().getUid())
+            .collect(Collectors.toList()));
+    }
+
+    private Mono<Void> updateDeletedMessageProjectionOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+        return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(metaData -> metaData.getFlags().contains(Flags.Flag.DELETED))
+            .map(MessageMetaData::getUid)
+            .collect(Collectors.toList()));
+    }
+
     public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
         Flags flags = message.createFlags();
 
@@ -138,8 +149,7 @@ public class CassandraIndexTableHandler {
             .collect(ImmutableList.toImmutableList());
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(messages)
-                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+                checkDeletedOnAdd(mailboxId, messages),
                 Flux.fromIterable(messages)
                     .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
@@ -160,7 +170,7 @@ public class CassandraIndexTableHandler {
                 manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
-                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
+                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
             .then();
     }
 
@@ -171,20 +181,21 @@ public class CassandraIndexTableHandler {
                 .collect(ImmutableSet.toImmutableSet()));
     }
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
-        return Flux.fromIterable(updatedFlags)
-            .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
-            .then();
-    }
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        ImmutableList.Builder<MessageUid> addDeletedUidsBuilder = ImmutableList.builder();
+        ImmutableList.Builder<MessageUid> removeDeletedUidsBuilder = ImmutableList.builder();
+        updatedFlags.forEach(flag -> {
+                if (flag.isModifiedToSet(Flags.Flag.DELETED)) {
+                    addDeletedUidsBuilder.add(flag.getUid());
+                } else if (flag.isModifiedToUnset(Flags.Flag.DELETED)) {
+                    removeDeletedUidsBuilder.add(flag.getUid());
+                }
+            });
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
-            return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid());
-        } else if (updatedFlags.isModifiedToUnset(Flags.Flag.DELETED)) {
-            return deletedMessageDAO.removeDeleted(mailboxId, updatedFlags.getUid());
-        } else {
-            return Mono.empty();
-        }
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+            deletedMessageDAO.addDeleted(mailboxId, addDeletedUidsBuilder.build()),
+            deletedMessageDAO.removeDeleted(mailboxId, removeDeletedUidsBuilder.build()))
+            .then();
     }
 
     private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Flags flags) {
@@ -292,6 +303,12 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
+    private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+        return deletedMessageDAO.addDeleted(mailboxId, mailboxMessages.stream().filter(message -> message.createFlags().contains(Flags.Flag.DELETED))
+            .map(MailboxMessage::getUid)
+            .collect(Collectors.toList()));
+    }
+
     private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.SEEN)) {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
index 87cc6a40c6..2bb4f3b790 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
@@ -66,8 +66,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void addDeletedMessageShouldThenBeReportedAsDeletedMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
                 .collectList()
@@ -78,8 +77,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void retrieveDeletedMessageShouldNotReturnDeletedEntries() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeAll(MAILBOX_ID).block();
 
@@ -122,8 +120,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void removeDeletedMessageShouldNotAffectOtherMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_2, UID_1)).block();
 
         testee.removeDeleted(MAILBOX_ID, UID_1).block();
 
@@ -150,12 +147,7 @@ class CassandraDeletedMessageDAOTest {
     }
 
     private void addMessageForRetrieveTest() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
-        testee.addDeleted(MAILBOX_ID, UID_3).block();
-        testee.addDeleted(MAILBOX_ID, UID_4).block();
-        testee.addDeleted(MAILBOX_ID, UID_7).block();
-        testee.addDeleted(MAILBOX_ID, UID_8).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2, UID_3, UID_4, UID_7, UID_8)).block();
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org