You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:53 UTC
[james-project] 02/04: JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 31b9d6e746de34ce3365790d5dc3c3680ead9717
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 12:18:03 2022 +0700
JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement
---
.../cassandra/mail/CassandraDeletedMessageDAO.java | 50 +++++++++++++++++++
.../cassandra/mail/CassandraIndexTableHandler.java | 57 ++++++++++++++--------
.../mail/CassandraDeletedMessageDAOTest.java | 16 ++----
3 files changed, 91 insertions(+), 32 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 69e301b6b4..01abf65271 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,6 +30,9 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
+import java.util.List;
+import java.util.stream.Stream;
+
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,9 +40,11 @@ import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.MessageRange;
+import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -47,6 +52,8 @@ import reactor.core.publisher.Mono;
public class CassandraDeletedMessageDAO {
private static final String UID_TO = "uid_to";
private static final String UID_FROM = "uid_from";
+ private static final int BATCH_STATEMENT_WINDOW = 1024;
+ private static final int LOW_CONCURRENCY = 2;
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement addStatement;
@@ -123,12 +130,55 @@ public class CassandraDeletedMessageDAO {
.setLong(UID, uid.asLong()));
}
+ public Mono<Void> addDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+ if (uids.size() == 1) {
+ return cassandraAsyncExecutor.executeVoid(
+ addStatement.bind()
+ .setUUID(MAILBOX_ID, cassandraId.asUuid())
+ .setLong(UID, uids.iterator().next().asLong()));
+ } else {
+ Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+ .stream()
+ .map(uidBatch -> {
+ BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+ uidBatch.forEach(uid -> batch.add(addStatement.bind()
+ .setUUID(MAILBOX_ID, cassandraId.asUuid())
+ .setLong(UID, uid.asLong())));
+ return batch;
+ });
+ return Flux.fromStream(batches)
+ .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+ .then();
+ }
+ }
+
public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUUID(MAILBOX_ID, cassandraId.asUuid())
.setLong(UID, uid.asLong()));
}
+ public Mono<Void> removeDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+ if (uids.size() == 1) {
+ return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+ .setUUID(MAILBOX_ID, cassandraId.asUuid())
+ .setLong(UID, uids.iterator().next().asLong()));
+ } else {
+ Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+ .stream()
+ .map(uidBatch -> {
+ BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+ uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+ .setUUID(MAILBOX_ID, cassandraId.asUuid())
+ .setLong(UID, uid.asLong())));
+ return batch;
+ });
+ return Flux.fromStream(batches)
+ .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+ .then();
+ }
+ }
+
public Mono<Void> removeAll(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
.setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 5e2cd75fca..27682cc714 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
@@ -81,8 +82,7 @@ public class CassandraIndexTableHandler {
.flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
Flux.fromIterable(metaData)
.flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
- Flux.fromIterable(metaData)
- .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+ updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
decrementCountersOnDelete(mailboxId, metaData))
.then();
}
@@ -93,8 +93,7 @@ public class CassandraIndexTableHandler {
.flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
Flux.fromIterable(metaData)
.flatMap(message -> updateRecentOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
- Flux.fromIterable(metaData)
- .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+ updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
.map(ComposedMessageIdWithMetaData::getFlags)
.collect(ImmutableList.toImmutableList())))
@@ -117,6 +116,18 @@ public class CassandraIndexTableHandler {
return Mono.empty();
}
+ private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaDatas) {
+ return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(composedId -> composedId.getFlags().contains(Flags.Flag.DELETED))
+ .map(composedId -> composedId.getComposedMessageId().getUid())
+ .collect(Collectors.toList()));
+ }
+
+ private Mono<Void> updateDeletedMessageProjectionOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+ return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(metaData -> metaData.getFlags().contains(Flags.Flag.DELETED))
+ .map(MessageMetaData::getUid)
+ .collect(Collectors.toList()));
+ }
+
public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
Flags flags = message.createFlags();
@@ -138,8 +149,7 @@ public class CassandraIndexTableHandler {
.collect(ImmutableList.toImmutableList());
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
- Flux.fromIterable(messages)
- .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+ checkDeletedOnAdd(mailboxId, messages),
Flux.fromIterable(messages)
.flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
Flux.fromIterable(messages)
@@ -160,7 +170,7 @@ public class CassandraIndexTableHandler {
manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
- updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
+ updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
.then();
}
@@ -171,20 +181,21 @@ public class CassandraIndexTableHandler {
.collect(ImmutableSet.toImmutableSet()));
}
- private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
- return Flux.fromIterable(updatedFlags)
- .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
- .then();
- }
+ private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ ImmutableList.Builder<MessageUid> addDeletedUidsBuilder = ImmutableList.builder();
+ ImmutableList.Builder<MessageUid> removeDeletedUidsBuilder = ImmutableList.builder();
+ updatedFlags.forEach(flag -> {
+ if (flag.isModifiedToSet(Flags.Flag.DELETED)) {
+ addDeletedUidsBuilder.add(flag.getUid());
+ } else if (flag.isModifiedToUnset(Flags.Flag.DELETED)) {
+ removeDeletedUidsBuilder.add(flag.getUid());
+ }
+ });
- private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
- if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
- return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid());
- } else if (updatedFlags.isModifiedToUnset(Flags.Flag.DELETED)) {
- return deletedMessageDAO.removeDeleted(mailboxId, updatedFlags.getUid());
- } else {
- return Mono.empty();
- }
+ return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+ deletedMessageDAO.addDeleted(mailboxId, addDeletedUidsBuilder.build()),
+ deletedMessageDAO.removeDeleted(mailboxId, removeDeletedUidsBuilder.build()))
+ .then();
}
private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Flags flags) {
@@ -292,6 +303,12 @@ public class CassandraIndexTableHandler {
return Mono.empty();
}
+ private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+ return deletedMessageDAO.addDeleted(mailboxId, mailboxMessages.stream().filter(message -> message.createFlags().contains(Flags.Flag.DELETED))
+ .map(MailboxMessage::getUid)
+ .collect(Collectors.toList()));
+ }
+
private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) {
if (flags.contains(Flags.Flag.SEEN)) {
return Mono.empty();
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
index 87cc6a40c6..2bb4f3b790 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
@@ -66,8 +66,7 @@ class CassandraDeletedMessageDAOTest {
@Test
void addDeletedMessageShouldThenBeReportedAsDeletedMessage() {
- testee.addDeleted(MAILBOX_ID, UID_1).block();
- testee.addDeleted(MAILBOX_ID, UID_2).block();
+ testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
.collectList()
@@ -78,8 +77,7 @@ class CassandraDeletedMessageDAOTest {
@Test
void retrieveDeletedMessageShouldNotReturnDeletedEntries() {
- testee.addDeleted(MAILBOX_ID, UID_1).block();
- testee.addDeleted(MAILBOX_ID, UID_2).block();
+ testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
testee.removeAll(MAILBOX_ID).block();
@@ -122,8 +120,7 @@ class CassandraDeletedMessageDAOTest {
@Test
void removeDeletedMessageShouldNotAffectOtherMessage() {
- testee.addDeleted(MAILBOX_ID, UID_2).block();
- testee.addDeleted(MAILBOX_ID, UID_1).block();
+ testee.addDeleted(MAILBOX_ID, List.of(UID_2, UID_1)).block();
testee.removeDeleted(MAILBOX_ID, UID_1).block();
@@ -150,12 +147,7 @@ class CassandraDeletedMessageDAOTest {
}
private void addMessageForRetrieveTest() {
- testee.addDeleted(MAILBOX_ID, UID_1).block();
- testee.addDeleted(MAILBOX_ID, UID_2).block();
- testee.addDeleted(MAILBOX_ID, UID_3).block();
- testee.addDeleted(MAILBOX_ID, UID_4).block();
- testee.addDeleted(MAILBOX_ID, UID_7).block();
- testee.addDeleted(MAILBOX_ID, UID_8).block();
+ testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2, UID_3, UID_4, UID_7, UID_8)).block();
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org