You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/12/06 08:04:02 UTC
[james-project] 11/15: [PERF] Improve Cassandra rows interpretation in mailbox/cassandra
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 63d865dda3819f3a608beeca87557a8686219ac9
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 2 12:54:47 2022 +0700
[PERF] Improve Cassandra rows interpretation in mailbox/cassandra
---
.../cassandra/mail/CassandraMessageIdDAO.java | 23 +++++++++++++---------
.../mailbox/cassandra/mail/FlagsExtractor.java | 8 +++++++-
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index 8e4d26823a..edb99eb80c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -70,6 +70,7 @@ import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.util.streams.Limit;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -109,10 +110,13 @@ public class CassandraMessageIdDAO {
private final PreparedStatement selectUidRangeLimited;
private final PreparedStatement update;
private final PreparedStatement listStatement;
+ private final ProtocolVersion protocolVersion;
@Inject
public CassandraMessageIdDAO(CqlSession session, BlobId.Factory blobIdFactory) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+ this.protocolVersion = session.getContext().getProtocolVersion();
+
this.blobIdFactory = blobIdFactory;
this.delete = prepareDelete(session);
this.insert = prepareInsert(session);
@@ -417,7 +421,7 @@ public class CassandraMessageIdDAO {
public Flux<MessageUid> listUids(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeRows(selectAllUids.bind()
.set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID))
- .map(row -> MessageUid.of(row.getLong(0)));
+ .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion)));
}
public Flux<ComposedMessageIdWithMetaData> listMessagesMetadata(CassandraId mailboxId, MessageRange range) {
@@ -426,21 +430,21 @@ public class CassandraMessageIdDAO {
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
.setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
.map(row -> {
- CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
+ CassandraMessageId messageId = CassandraMessageId.Factory.of(row.get(MESSAGE_ID, TypeCodecs.TIMEUUID));
return ComposedMessageIdWithMetaData.builder()
- .modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
+ .modSeq(ModSeq.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(MOD_SEQ), protocolVersion)))
.threadId(getThreadIdFromRow(row, messageId))
.flags(FlagsExtractor.getFlags(row))
.composedMessageId(new ComposedMessageId(mailboxId,
messageId,
- MessageUid.of(row.getLong(IMAP_UID))))
+ MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(IMAP_UID), protocolVersion))))
.build();
});
}
public Flux<MessageUid> listNotDeletedUids(CassandraId mailboxId, MessageRange range) {
return cassandraAsyncExecutor.executeRows(selectNotDeletedRange.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
.setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
.filter(row -> !row.getBoolean(org.apache.james.mailbox.cassandra.table.Flag.DELETED))
@@ -449,10 +453,10 @@ public class CassandraMessageIdDAO {
private Flux<MessageUid> doListUids(CassandraId mailboxId, MessageRange range) {
return cassandraAsyncExecutor.executeRows(selectUidOnlyRange.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, range.getUidFrom().asLong())
.setLong(IMAP_UID_LTE, range.getUidTo().asLong()))
- .map(row -> MessageUid.of(row.getLong(IMAP_UID)));
+ .map(row -> MessageUid.of(TypeCodecs.BIGINT.decodePrimitive(row.getBytesUnsafe(0), protocolVersion)));
}
public Flux<MessageUid> listUids(CassandraId mailboxId, MessageRange range) {
@@ -517,7 +521,8 @@ public class CassandraMessageIdDAO {
}
private Optional<CassandraMessageMetadata> fromRowToComposedMessageIdWithFlags(Row row) {
- if (row.getUuid(MESSAGE_ID) == null) {
+ UUID rowAsUuid = row.getUuid(MESSAGE_ID);
+ if (rowAsUuid == null) {
// Out of order updates with concurrent deletes can result in the row being partially deleted
// We filter out such records, and cleanup them.
delete(CassandraId.of(row.getUuid(MAILBOX_ID)),
@@ -526,7 +531,7 @@ public class CassandraMessageIdDAO {
.subscribe();
return Optional.empty();
}
- final CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID));
+ CassandraMessageId messageId = CassandraMessageId.Factory.of(rowAsUuid);
return Optional.of(CassandraMessageMetadata.builder()
.ids(ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
index e20871266e..10e04eb35a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/FlagsExtractor.java
@@ -28,17 +28,23 @@ import javax.mail.Flags;
import org.apache.james.mailbox.cassandra.table.Flag;
import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
public class FlagsExtractor {
public static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = CodecRegistry.DEFAULT.codecFor(setOf(TEXT));
public static Flags getFlags(Row row) {
+ return getFlags(row, row.protocolVersion());
+ }
+
+ private static Flags getFlags(Row row, ProtocolVersion protocolVersion) {
Flags flags = new Flags();
for (CqlIdentifier cqlId : Flag.ALL_LOWERCASE) {
- if (row.getBoolean(cqlId)) {
+ if (TypeCodecs.BOOLEAN.decodePrimitive(row.getBytesUnsafe(cqlId), protocolVersion)) {
flags.add(Flag.JAVAX_MAIL_FLAG.get(cqlId));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org