You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/12/06 08:04:05 UTC

[james-project] 14/15: [PERF] Continue improve mailbox/cassandra row reading

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit faafc02dd4a846249f60937462e87e553f3adb4a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Dec 5 15:26:22 2022 +0700

    [PERF] Continue improve mailbox/cassandra row reading
---
 .../cassandra/mail/CassandraFirstUnseenDAO.java    | 11 ++++++--
 .../cassandra/mail/CassandraMailboxRecentsDAO.java |  5 +++-
 .../cassandra/mail/CassandraMessageDAOV3.java      | 31 +++++++++-------------
 .../cassandra/mail/CassandraMessageIdDAO.java      | 27 ++++++++++++++++---
 4 files changed, 50 insertions(+), 24 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index bd5eeb2aff..53fa2726cf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -38,10 +38,12 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 
 import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
 import com.datastax.oss.driver.api.core.cql.BatchStatement;
 import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
 import com.datastax.oss.driver.api.core.cql.BatchType;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.Row;
 import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 import com.google.common.collect.Lists;
@@ -59,6 +61,7 @@ public class CassandraFirstUnseenDAO {
     private final PreparedStatement deleteAllStatement;
     private final PreparedStatement readStatement;
     private final PreparedStatement listStatement;
+    private final ProtocolVersion protocolVersion;
 
     @Inject
     public CassandraFirstUnseenDAO(CqlSession session) {
@@ -68,6 +71,7 @@ public class CassandraFirstUnseenDAO {
         this.deleteAllStatement = prepareDeleteAllStatement(session);
         this.readStatement = prepareReadStatement(session);
         this.listStatement = prepareListStatement(session);
+        this.protocolVersion = session.getContext().getProtocolVersion();
     }
 
     private PreparedStatement prepareReadStatement(CqlSession session) {
@@ -172,14 +176,17 @@ public class CassandraFirstUnseenDAO {
         return cassandraAsyncExecutor.executeSingleRow(
                 readStatement.bind()
                     .setUuid(MAILBOX_ID, cassandraId.asUuid()))
-            .map(row -> MessageUid.of(row.getLong(UID)));
+            .map(this::asMessageUid);
     }
 
     public Flux<MessageUid> listUnseen(CassandraId cassandraId) {
         return cassandraAsyncExecutor.executeRows(
                 listStatement.bind()
                     .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID))
-            .map(row -> MessageUid.of(row.getLong(0)));
+            .map(this::asMessageUid);
     }
 
+    private MessageUid asMessageUid(Row row) {
+        return MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion));
+    }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index da5552eaa5..f30e4cd907 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
 
 import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
 import com.datastax.oss.driver.api.core.cql.BatchStatement;
 import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
 import com.datastax.oss.driver.api.core.cql.BatchType;
@@ -56,6 +57,7 @@ public class CassandraMailboxRecentsDAO {
     private final PreparedStatement deleteStatement;
     private final PreparedStatement deleteAllStatement;
     private final PreparedStatement addStatement;
+    private final ProtocolVersion protocolVersion;
 
     @Inject
     public CassandraMailboxRecentsDAO(CqlSession session) {
@@ -64,6 +66,7 @@ public class CassandraMailboxRecentsDAO {
         deleteStatement = createDeleteStatement(session);
         deleteAllStatement = createDeleteAllStatement(session);
         addStatement = createAddStatement(session);
+        protocolVersion = session.getContext().getProtocolVersion();
     }
 
     private PreparedStatement createReadStatement(CqlSession session) {
@@ -99,7 +102,7 @@ public class CassandraMailboxRecentsDAO {
 
     public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId mailboxId) {
         return cassandraAsyncExecutor.executeRows(bindWithMailbox(mailboxId, readStatement))
-            .map(row -> row.getLong(0))
+            .map(row -> TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion))
             .map(MessageUid::of);
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index ebe0675bcc..a7bf3669c4 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -60,13 +60,11 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
-import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
@@ -87,7 +85,6 @@ import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 
 import com.datastax.oss.driver.api.core.CqlIdentifier;
 import com.datastax.oss.driver.api.core.CqlSession;
-import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
 import com.datastax.oss.driver.api.core.cql.BoundStatement;
 import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -118,7 +115,6 @@ public class CassandraMessageDAOV3 {
     private final PreparedStatement select;
     private final PreparedStatement listBlobs;
     private final Cid.CidParser cidParser;
-    private final DriverExecutionProfile lwtProfile;
     private final UserDefinedType attachmentsType;
     private final TypeCodec<List<UdtValue>> attachmentCodec;
 
@@ -126,7 +122,6 @@ public class CassandraMessageDAOV3 {
     public CassandraMessageDAOV3(CqlSession session, CassandraTypesProvider typesProvider, BlobStore blobStore,
                                  BlobId.Factory blobIdFactory) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
         this.blobStore = blobStore;
         this.blobIdFactory = blobIdFactory;
 
@@ -207,9 +202,9 @@ public class CassandraMessageDAOV3 {
             .setString(CONTENT_MD5, message.getProperties().getContentMD5())
             .setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
             .setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
-            .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), String.class)
-            .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), String.class, String.class)
-            .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), String.class, String.class);
+            .set(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), LIST_OF_STRINGS_CODEC)
+            .set(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), MAP_OF_STRINGS_CODEC)
+            .set(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC);
 
         if (message.getAttachments().isEmpty()) {
             return cassandraAsyncExecutor.executeVoid(boundStatement.unset(ATTACHMENTS).build());
@@ -266,9 +261,9 @@ public class CassandraMessageDAOV3 {
             .setString(CONTENT_MD5, message.getProperties().getContentMD5())
             .setString(CONTENT_TRANSFER_ENCODING, message.getProperties().getContentTransferEncoding())
             .setString(CONTENT_LOCATION, message.getProperties().getContentLocation())
-            .setList(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), String.class)
-            .setMap(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), String.class, String.class)
-            .setMap(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), String.class, String.class);
+            .set(CONTENT_LANGUAGE, message.getProperties().getContentLanguage(), LIST_OF_STRINGS_CODEC)
+            .set(CONTENT_DISPOSITION_PARAMETERS, message.getProperties().getContentDispositionParameters(), MAP_OF_STRINGS_CODEC)
+            .set(CONTENT_TYPE_PARAMETERS, message.getProperties().getContentTypeParameters(), MAP_OF_STRINGS_CODEC);
 
         if (message.getAttachments().isEmpty()) {
             return boundStatement.unset(ATTACHMENTS);
@@ -334,8 +329,7 @@ public class CassandraMessageDAOV3 {
     private Mono<Row> retrieveRow(CassandraMessageId messageId) {
         return cassandraAsyncExecutor.executeSingleRow(select
             .bind()
-            .setUuid(MESSAGE_ID, messageId.get())
-            .setExecutionProfile(lwtProfile));
+            .set(MESSAGE_ID, messageId.get(), TypeCodecs.TIMEUUID));
     }
 
     private Mono<MessageRepresentation> message(Row row, CassandraMessageId cassandraMessageId, FetchType fetchType) {
@@ -351,7 +345,7 @@ public class CassandraMessageDAOV3 {
                     row.getInt(BODY_START_OCTET),
                     content,
                     getProperties(row),
-                    getAttachments(row).collect(ImmutableList.toImmutableList()),
+                    getAttachments(row),
                     headerId,
                     bodyId));
     }
@@ -373,15 +367,16 @@ public class CassandraMessageDAOV3 {
         return property.build();
     }
 
-    private Stream<MessageAttachmentRepresentation> getAttachments(Row row) {
+    private List<MessageAttachmentRepresentation> getAttachments(Row row) {
         return Optional.ofNullable(row.get(ATTACHMENTS, attachmentCodec))
             .map(this::attachmentByIds)
-            .orElseGet(Stream::of);
+            .orElseGet(ImmutableList::of);
     }
 
-    private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UdtValue> udtValues) {
+    private List<MessageAttachmentRepresentation> attachmentByIds(List<UdtValue> udtValues) {
         return udtValues.stream()
-            .map(this::messageAttachmentByIdFrom);
+            .map(this::messageAttachmentByIdFrom)
+            .collect(ImmutableList.toImmutableList());
     }
 
     private MessageAttachmentRepresentation messageAttachmentByIdFrom(UdtValue udtValue) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 9382e7a62a..4d4177811a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -50,6 +50,8 @@ import java.time.Duration;
 import java.util.Date;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -87,6 +89,20 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdDAO {
+    private static class MemoizedSupplier<T> {
+        private final AtomicReference<T> value = new AtomicReference<>();
+
+        T get(Supplier<T> initializer) {
+            T result = value.get();
+            if (result == null) {
+                T initialValue = initializer.get();
+                value.set(initialValue);
+                return initialValue;
+            }
+            return result;
+        }
+    }
+
     private static final String IMAP_UID_GTE = IMAP_UID + "_GTE";
     private static final String IMAP_UID_LTE = IMAP_UID + "_LTE";
     public static final String LIMIT = "LIMIT_BIND_MARKER";
@@ -443,12 +459,17 @@ public class CassandraMessageIdDAO {
     }
 
     public Flux<MessageUid> listNotDeletedUids(CassandraId mailboxId, MessageRange range) {
+        MemoizedSupplier<Integer> deletedPosition = new MemoizedSupplier<>();
+        MemoizedSupplier<Integer> uidPosition = new MemoizedSupplier<>();
+
         return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind()
                 .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
                 .setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
                 .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
-            .filter(row -> !row.getBoolean(org.apache.james.mailbox.cassandra.table.Flag.DELETED))
-            .map(row -> MessageUid.of(row.getLong(IMAP_UID)));
+            .filter(row -> !TypeCodecs.BOOLEAN.decodePrimitive(
+                row.getBytesUnsafe(deletedPosition.get(() -> row.getColumnDefinitions().firstIndexOf(DELETED))), protocolVersion))
+            .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(
+                row.getBytesUnsafe(uidPosition.get(() -> row.getColumnDefinitions().firstIndexOf(IMAP_UID))), protocolVersion)));
     }
 
     private Flux<MessageUid> doListUids(CassandraId mailboxId, MessageRange range) {
@@ -521,7 +542,7 @@ public class CassandraMessageIdDAO {
     }
 
     private Optional<CassandraMessageMetadata> fromRowToComposedMessageIdWithFlags(Row row) {
-        UUID rowAsUuid = row.getUuid(MESSAGE_ID);
+        UUID rowAsUuid = row.get(MESSAGE_ID, TypeCodecs.TIMEUUID);
         if (rowAsUuid == null) {
             // Out of order updates with concurrent deletes can result in the row being partially deleted
             // We filter out such records, and cleanup them.


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org