You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:55 UTC
[james-project] 04/04: JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b145cdc8c040280cd379fd514804fdd41ae3f918
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 13:44:33 2022 +0700
JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement
---
.../cassandra/mail/CassandraFirstUnseenDAO.java | 51 ++++++++++++++++++
.../cassandra/mail/CassandraIndexTableHandler.java | 63 +++++++++++++---------
.../cassandra/mail/CassandraMessageMapper.java | 4 +-
.../mail/CassandraFirstUnseenDAOTest.java | 19 +++----
4 files changed, 98 insertions(+), 39 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index 6c5c2d91b9..c58499ea4d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -29,19 +29,27 @@ import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable
import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.TABLE_NAME;
import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.UID;
+import java.util.List;
+import java.util.stream.Stream;
+
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CassandraFirstUnseenDAO {
+ private static final int BATCH_STATEMENT_WINDOW = 1024;
+ private static final int LOW_CONCURRENCY = 2;
+
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement addStatement;
private final PreparedStatement deleteStatement;
@@ -100,12 +108,55 @@ public class CassandraFirstUnseenDAO {
.setLong(UID, uid.asLong()));
}
+ public Mono<Void> addUnread(CassandraId mailboxId, List<MessageUid> uids) {
+ if (uids.size() == 1) {
+ return cassandraAsyncExecutor.executeVoid(
+ addStatement.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(UID, uids.iterator().next().asLong()));
+ } else {
+ Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+ .stream()
+ .map(uidBatch -> {
+ BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+ uidBatch.forEach(uid -> batch.add(addStatement.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(UID, uid.asLong())));
+ return batch;
+ });
+ return Flux.fromStream(batches)
+ .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+ .then();
+ }
+ }
+
public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
.setUUID(MAILBOX_ID, cassandraId.asUuid())
.setLong(UID, uid.asLong()));
}
+ public Mono<Void> removeUnread(CassandraId mailboxId, List<MessageUid> uids) {
+ if (uids.size() == 1) {
+ return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(UID, uids.iterator().next().asLong()));
+ } else {
+ Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+ .stream()
+ .map(uidBatch -> {
+ BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+ uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(UID, uid.asLong())));
+ return batch;
+ });
+ return Flux.fromStream(batches)
+ .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+ .then();
+ }
+ }
+
public Mono<Void> removeAll(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
.setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index b1a5f45758..b16a020e54 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,8 +19,6 @@
package org.apache.james.mailbox.cassandra.mail;
-import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
-
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@@ -78,8 +76,7 @@ public class CassandraIndexTableHandler {
public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
- Flux.fromIterable(metaData)
- .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
+ updateFirstUnseenOnDeleteWithMetadata(mailboxId, metaData),
updateRecentOnDeleteWithMetadata(mailboxId, metaData),
updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
decrementCountersOnDelete(mailboxId, metaData))
@@ -88,8 +85,7 @@ public class CassandraIndexTableHandler {
public Mono<Void> updateIndexOnDeleteComposedId(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaData) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
- Flux.fromIterable(metaData)
- .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
+ updateFirstUnseenOnDelete(mailboxId, metaData),
updateRecentOnDeleteWithComposeId(mailboxId, metaData),
updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
@@ -142,7 +138,7 @@ public class CassandraIndexTableHandler {
.then();
}
- public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
+ public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
ImmutableSet<String> userFlags = messages.stream()
.flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
.collect(ImmutableSet.toImmutableSet());
@@ -152,8 +148,7 @@ public class CassandraIndexTableHandler {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
checkDeletedOnAdd(mailboxId, messages),
- Flux.fromIterable(messages)
- .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+ updateFirstUnseenOnAdd(mailboxId, messages),
addRecentOnSave(mailboxId, messages),
incrementCountersOnSave(mailboxId, flags),
applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
@@ -161,15 +156,14 @@ public class CassandraIndexTableHandler {
}
public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
- int fairConcurrency = 4;
- return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
+ return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
}
- public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
+ public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
- updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+ updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
.then();
@@ -303,6 +297,12 @@ public class CassandraIndexTableHandler {
return firstUnseenDAO.addUnread(mailboxId, uid);
}
+ private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+ return firstUnseenDAO.addUnread(mailboxId, mailboxMessages.stream().filter(mailboxMessage -> !mailboxMessage.createFlags().contains(Flags.Flag.SEEN))
+ .map(MailboxMessage::getUid)
+ .collect(Collectors.toList()));
+ }
+
private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) {
if (flags.contains(Flags.Flag.DELETED)) {
return deletedMessageDAO.addDeleted(mailboxId, uid);
@@ -324,19 +324,32 @@ public class CassandraIndexTableHandler {
return firstUnseenDAO.removeUnread(mailboxId, uid);
}
- private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
- return Flux.fromIterable(updatedFlags)
- .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
- .then();
+ private Mono<Void> updateFirstUnseenOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+ return firstUnseenDAO.removeUnread(mailboxId, metaDatas.stream().filter(metaData -> !metaData.getFlags().contains(Flags.Flag.SEEN))
+ .map(MessageMetaData::getUid)
+ .collect(Collectors.toList()));
}
- private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
- if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
- return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
- }
- if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
- return firstUnseenDAO.removeUnread(mailboxId, updatedFlags.getUid());
- }
- return Mono.empty();
+ private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> composedMessageIdWithMetaData) {
+ return firstUnseenDAO.removeUnread(mailboxId, composedMessageIdWithMetaData.stream().filter(composeId -> !composeId.getFlags().contains(Flags.Flag.SEEN))
+ .map(composeId -> composeId.getComposedMessageId().getUid())
+ .collect(Collectors.toList()));
+ }
+
+ private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ ImmutableList.Builder<MessageUid> addUnreadUidsBuilder = ImmutableList.builder();
+ ImmutableList.Builder<MessageUid> removeUnreadUidsBuilder = ImmutableList.builder();
+ updatedFlags.forEach(flag -> {
+ if (flag.isModifiedToUnset(Flags.Flag.SEEN)) {
+ addUnreadUidsBuilder.add(flag.getUid());
+ } else if (flag.isModifiedToSet(Flags.Flag.SEEN)) {
+ removeUnreadUidsBuilder.add(flag.getUid());
+ }
+ });
+
+ return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+ firstUnseenDAO.addUnread(mailboxId, addUnreadUidsBuilder.build()),
+ firstUnseenDAO.removeUnread(mailboxId, removeUnreadUidsBuilder.build()))
+ .then();
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 06fcdf1994..54b9c18bed 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -521,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
- return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
+ return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
.onErrorResume(e -> {
LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
return Mono.empty();
@@ -644,7 +644,7 @@ public class CassandraMessageMapper implements MessageMapper {
.concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
.flatMap(id -> messageIdDAO.insert(id)
.retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
- .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
+ .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
}
private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
index a3a31a07d4..9d7aec0236 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
+import java.util.List;
+
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.mailbox.MessageUid;
@@ -72,8 +74,7 @@ class CassandraFirstUnseenDAOTest {
@Test
void removeAllShouldDeleteAllUidEntries() {
- testee.addUnread(MAILBOX_ID, UID_1).block();
- testee.addUnread(MAILBOX_ID, UID_2).block();
+ testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
testee.removeAll(MAILBOX_ID).block();
@@ -88,9 +89,7 @@ class CassandraFirstUnseenDAOTest {
@Test
void retrieveFirstUnreadShouldReturnLowestUnreadUid() {
- testee.addUnread(MAILBOX_ID, UID_1).block();
-
- testee.addUnread(MAILBOX_ID, UID_2).block();
+ testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
.isEqualByComparingTo(UID_1);
@@ -114,9 +113,7 @@ class CassandraFirstUnseenDAOTest {
@Test
void retrieveFirstUnreadShouldBeOrderIndependent() {
- testee.addUnread(MAILBOX_ID, UID_2).block();
-
- testee.addUnread(MAILBOX_ID, UID_1).block();
+ testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
.isEqualByComparingTo(UID_1);
@@ -152,8 +149,7 @@ class CassandraFirstUnseenDAOTest {
@Test
void removeUnreadShouldRemoveLastUnread() {
- testee.addUnread(MAILBOX_ID, UID_1).block();
- testee.addUnread(MAILBOX_ID, UID_2).block();
+ testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
testee.removeUnread(MAILBOX_ID, UID_2).block();
@@ -163,8 +159,7 @@ class CassandraFirstUnseenDAOTest {
@Test
void removeUnreadShouldHaveNoEffectWhenNotLast() {
- testee.addUnread(MAILBOX_ID, UID_1).block();
- testee.addUnread(MAILBOX_ID, UID_2).block();
+ testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
testee.removeUnread(MAILBOX_ID, UID_1).block();
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org