You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/12/06 08:04:02 UTC

[james-project] 11/15: [PERF] Improve Cassandra rows interpretation in mailbox/cassandra

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 63d865dda3819f3a608beeca87557a8686219ac9
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 2 12:54:47 2022 +0700

    [PERF] Improve Cassandra rows interpretation in mailbox/cassandra
---
 .../cassandra/mail/CassandraMessageIdDAO.java      | 23 +++++++++++++---------
 .../mailbox/cassandra/mail/FlagsExtractor.java     |  8 +++++++-
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 8e4d26823a..edb99eb80c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -70,6 +70,7 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.util.streams.Limit;
 
 import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
 import com.datastax.oss.driver.api.core.cql.BoundStatement;
 import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -109,10 +110,13 @@ public class CassandraMessageIdDAO {
     private final PreparedStatement selectUidRangeLimited;
     private final PreparedStatement update;
     private final PreparedStatement listStatement;
+    private final ProtocolVersion protocolVersion;
 
     @Inject
     public CassandraMessageIdDAO(CqlSession session, BlobId.Factory blobIdFactory) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.protocolVersion = session.getContext().getProtocolVersion();
+
         this.blobIdFactory = blobIdFactory;
         this.delete = prepareDelete(session);
         this.insert = prepareInsert(session);
@@ -417,7 +421,7 @@ public class CassandraMessageIdDAO {
     public Flux<MessageUid> listUids(CassandraId mailboxId) {
         return cassandraAsyncExecutor.executeRows(selectAllUids.bind()
                 .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID))
-            .map(row -> MessageUid.of(row.getLong(0)));
+            .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion)));
     }
 
     public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(CassandraId mailboxId, MessageRange range) {
@@ -426,21 +430,21 @@ public class CassandraMessageIdDAO {
                 .setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
                 .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
             .map(row -> {
-                CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
+                CassandraMessageId messageId = CassandraMessageId.Factory.of(row.get(MESSAGE_ID, TypeCodecs.TIMEUUID));
                 return ComposedMessageIdWithMetaData.builder()
-                    .modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
+                    .modSeq(ModSeq.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(MOD_SEQ), protocolVersion)))
                     .threadId(getThreadIdFromRow(row, messageId))
                     .flags(FlagsExtractor.getFlags(row))
                     .composedMessageId(new ComposedMessageId(mailboxId,
                         messageId,
-                        MessageUid.of(row.getLong(IMAP_UID))))
+                        MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(IMAP_UID), protocolVersion))))
                     .build();
             });
     }
 
     public Flux<MessageUid> listNotDeletedUids(CassandraId mailboxId, MessageRange range) {
         return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind()
-                .setUuid(MAILBOX_ID, mailboxId.asUuid())
+                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
                 .setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
                 .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
             .filter(row -> !row.getBoolean(org.apache.james.mailbox.cassandra.table.Flag.DELETED))
@@ -449,10 +453,10 @@ public class CassandraMessageIdDAO {
 
     private Flux<MessageUid> doListUids(CassandraId mailboxId, MessageRange range) {
         return cassandraAsyncExecutor.executeRows(selectUidOnlyRange.bind()
-                .setUuid(MAILBOX_ID, mailboxId.asUuid())
+                .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
                 .setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
                 .setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
-            .map(row -> MessageUid.of(row.getLong(IMAP_UID)));
+            .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion)));
     }
 
     public Flux<MessageUid> listUids(CassandraId mailboxId, MessageRange range) {
@@ -517,7 +521,8 @@ public class CassandraMessageIdDAO {
     }
 
     private Optional<CassandraMessageMetadata> fromRowToComposedMessageIdWithFlags(Row row) {
-        if (row.getUuid(MESSAGE_ID) == null) {
+        UUID rowAsUuid = row.getUuid(MESSAGE_ID);
+        if (rowAsUuid == null) {
             // Out of order updates with concurrent deletes can result in the row being partially deleted
             // We filter out such records, and cleanup them.
             delete(CassandraId.of(row.getUuid(MAILBOX_ID)),
@@ -526,7 +531,7 @@ public class CassandraMessageIdDAO {
                 .subscribe();
             return Optional.empty();
         }
-        final CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
+        CassandraMessageId messageId = CassandraMessageId.Factory.of(rowAsUuid);
         return Optional.of(CassandraMessageMetadata.builder()
             .ids(ComposedMessageIdWithMetaData.builder()
                 .composedMessageId(new ComposedMessageId(
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
index e20871266e..10e04eb35a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
@@ -28,17 +28,23 @@ import javax.mail.Flags;
 import org.apache.james.mailbox.cassandra.table.Flag;
 
 import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
 import com.datastax.oss.driver.api.core.cql.Row;
 import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
 
 public class FlagsExtractor {
     public static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = CodecRegistry.DEFAULT.codecFor(setOf(TEXT));
 
     public static Flags getFlags(Row row) {
+        return getFlags(row, row.protocolVersion());
+    }
+
+    private static Flags getFlags(Row row, ProtocolVersion protocolVersion) {
         Flags flags = new Flags();
         for (CqlIdentifier cqlId : Flag.ALL_LOWERCASE) {
-            if (row.getBoolean(cqlId)) {
+            if (TypeCodecs.BOOLEAN.decodePrimitive(row.getBytesUnsafe(cqlId), protocolVersion)) {
                 flags.add(Flag.JAVAX_MAIL_FLAG.get(cqlId));
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org