You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/12/06 08:04:05 UTC
[james-project] 14/15: [PERF] Continue improve mailbox/cassandra row reading
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit faafc02dd4a846249f60937462e87e553f3adb4a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Dec 5 15:26:22 2022 +0700
[PERF] Continue improve mailbox/cassandra row reading
---
.../cassandra/mail/CassandraFirstUnseenDAO.java | 11 ++++++--
.../cassandra/mail/CassandraMailboxRecentsDAO.java | 5 +++-
.../cassandra/mail/CassandraMessageDAOV3.java | 31 +++++++++-------------
.../cassandra/mail/CassandraMessageIdDAO.java | 27 ++++++++++++++++---
4 files changed, 50 insertions(+), 24 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index bd5eeb2aff..53fa2726cf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -38,10 +38,12 @@ import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.google.common.collect.Lists;
@@ -59,6 +61,7 @@ public class CassandraFirstUnseenDAO {
private final PreparedStatement deleteAllStatement;
private final PreparedStatement readStatement;
private final PreparedStatement listStatement;
+ private final ProtocolVersion protocolVersion;
@Inject
public CassandraFirstUnseenDAO(CqlSession session) {
@@ -68,6 +71,7 @@ public class CassandraFirstUnseenDAO {
this.deleteAllStatement = prepareDeleteAllStatement(session);
this.readStatement = prepareReadStatement(session);
this.listStatement = prepareListStatement(session);
+ this.protocolVersion = session.getContext().getProtocolVersion();
}
private PreparedStatement prepareReadStatement(CqlSession session) {
@@ -172,14 +176,17 @@ public class CassandraFirstUnseenDAO {
return cassandraAsyncExecutor.executeSingleRow(
readStatement.bind()
.setUuid(MAILBOX_ID, cassandraId.asUuid()))
- .map(row -> MessageUid.of(row.getLong(UID)));
+ .map(this::asMessageUid);
}
public Flux<MessageUid> listUnseen(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeRows(
listStatement.bind()
.set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID))
- .map(row -> MessageUid.of(row.getLong(0)));
+ .map(this::asMessageUid);
}
+ private MessageUid asMessageUid(Row row) {
+ return MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion));
+ }
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index da5552eaa5..f30e4cd907 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
@@ -56,6 +57,7 @@ public class CassandraMailboxRecentsDAO {
private final PreparedStatement deleteStatement;
private final PreparedStatement deleteAllStatement;
private final PreparedStatement addStatement;
+ private final ProtocolVersion protocolVersion;
@Inject
public CassandraMailboxRecentsDAO(CqlSession session) {
@@ -64,6 +66,7 @@ public class CassandraMailboxRecentsDAO {
deleteStatement = createDeleteStatement(session);
deleteAllStatement = createDeleteAllStatement(session);
addStatement = createAddStatement(session);
+ protocolVersion = session.getContext().getProtocolVersion();
}
private PreparedStatement createReadStatement(CqlSession session) {
@@ -99,7 +102,7 @@ public class CassandraMailboxRecentsDAO {
public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeRows(bindWithMailbox(mailboxId, readStatement))
- .map(row -> row.getLong(0))
+ .map(row -> TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion))
.map(MessageUid::of);
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index ebe0675bcc..a7bf3669c4 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -60,13 +60,11 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
-import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
@@ -87,7 +85,6 @@ import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
-import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -118,7 +115,6 @@ public class CassandraMessageDAOV3 {
private final PreparedStatement select;
private final PreparedStatement listBlobs;
private final Cid.CidParser cidParser;
- private final DriverExecutionProfile lwtProfile;
private final UserDefinedType attachmentsType;
private final TypeCodec<List<UdtValue>> attachmentCodec;
@@ -126,7 +122,6 @@ public class CassandraMessageDAOV3 {
public CassandraMessageDAOV3(CqlSession session, CassandraTypesProvider typesProvider, BlobStore blobStore,
BlobId.Factory blobIdFactory) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
- this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
this.blobStore = blobStore;
this.blobIdFactory = blobIdFactory;
@@ -207,9 +202,9 @@ public class CassandraMessageDAOV3 {
.setString(CONTENT_MD5, message.getProperties().getContentMD5())
.setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
.setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
- .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), String.class)
- .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), String.class, String.class)
- .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), String.class, String.class);
+ .set(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), LIST_OF_STRINGS_CODEC)
+ .set(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), MAP_OF_STRINGS_CODEC)
+ .set(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC);
if (message.getAttachments().isEmpty()) {
return cassandraAsyncExecutor.executeVoid(boundStatement.unset(ATTACHMENTS).build());
@@ -266,9 +261,9 @@ public class CassandraMessageDAOV3 {
.setString(CONTENT_MD5, message.getProperties().getContentMD5())
.setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
.setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
- .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), String.class)
- .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), String.class, String.class)
- .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), String.class, String.class);
+ .set(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), LIST_OF_STRINGS_CODEC)
+ .set(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), MAP_OF_STRINGS_CODEC)
+ .set(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC);
if (message.getAttachments().isEmpty()) {
return boundStatement.unset(ATTACHMENTS);
@@ -334,8 +329,7 @@ public class CassandraMessageDAOV3 {
private Mono<Row> retrieveRow(CassandraMessageId messageId) {
return cassandraAsyncExecutor.executeSingleRow(select
.bind()
- .setUuid(MESSAGE_ID, messageId.get())
- .setExecutionProfile(lwtProfile));
+ .set(MESSAGE_ID, messageId.get(), TypeCodecs.TIMEUUID));
}
private Mono<MessageRepresentation> message(Row row, CassandraMessageId cassandraMessageId, FetchType fetchType) {
@@ -351,7 +345,7 @@ public class CassandraMessageDAOV3 {
row.getInt(BODY_START_OCTET),
content,
getProperties(row),
- getAttachments(row).collect(ImmutableList.toImmutableList()),
+ getAttachments(row),
headerId,
bodyId));
}
@@ -373,15 +367,16 @@ public class CassandraMessageDAOV3 {
return property.build();
}
- private Stream<MessageAttachmentRepresentation> getAttachments(Row row) {
+ private List<MessageAttachmentRepresentation> getAttachments(Row row) {
return Optional.ofNullable(row.get(ATTACHMENTS, attachmentCodec))
.map(this::attachmentByIds)
- .orElseGet(Stream::of);
+ .orElseGet(ImmutableList::of);
}
- private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UdtValue> udtValues) {
+ private List<MessageAttachmentRepresentation> attachmentByIds(List<UdtValue> udtValues) {
return udtValues.stream()
- .map(this::messageAttachmentByIdFrom);
+ .map(this::messageAttachmentByIdFrom)
+ .collect(ImmutableList.toImmutableList());
}
private MessageAttachmentRepresentation messageAttachmentByIdFrom(UdtValue udtValue) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 9382e7a62a..4d4177811a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -50,6 +50,8 @@ import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import javax.inject.Inject;
import javax.mail.Flags;
@@ -87,6 +89,20 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class CassandraMessageIdDAO {
+ private static class MemoizedSupplier<T> {
+ private final AtomicReference<T> value = new AtomicReference<>();
+
+ T get(Supplier<T> initializer) {
+ T result = value.get();
+ if (result == null) {
+ T initialValue = initializer.get();
+ value.set(initialValue);
+ return initialValue;
+ }
+ return result;
+ }
+ }
+
private static final String IMAP_UID_GTE = IMAP_UID + "_GTE";
private static final String IMAP_UID_LTE = IMAP_UID + "_LTE";
public static final String LIMIT = "LIMIT_BIND_MARKER";
@@ -443,12 +459,17 @@ public class CassandraMessageIdDAO {
}
public Flux<MessageUid> listNotDeletedUids(CassandraId mailboxId, MessageRange range) {
+ MemoizedSupplier<Integer> deletedPosition = new MemoizedSupplier<>();
+ MemoizedSupplier<Integer> uidPosition = new MemoizedSupplier<>();
+
return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
.setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
- .filter(row -> !row.getBoolean(org.apache.james.mailbox.cassandra.table.Flag.DELETED))
- .map(row -> MessageUid.of(row.getLong(IMAP_UID)));
+ .filter(row -> !TypeCodecs.BOOLEAN.decodePrimitive(
+ row.getBytesUnsafe(deletedPosition.get(() -> row.getColumnDefinitions().firstIndexOf(DELETED))), protocolVersion))
+ .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(
+ row.getBytesUnsafe(uidPosition.get(() -> row.getColumnDefinitions().firstIndexOf(IMAP_UID))), protocolVersion)));
}
private Flux<MessageUid> doListUids(CassandraId mailboxId, MessageRange range) {
@@ -521,7 +542,7 @@ public class CassandraMessageIdDAO {
}
private Optional<CassandraMessageMetadata> fromRowToComposedMessageIdWithFlags(Row row) {
- UUID rowAsUuid = row.getUuid(MESSAGE_ID);
+ UUID rowAsUuid = row.get(MESSAGE_ID, TypeCodecs.TIMEUUID);
if (rowAsUuid == null) {
// Out of order updates with concurrent deletes can result in the row being partially deleted
// We filter out such records, and cleanup them.
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org