You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/12/06 08:04:03 UTC
[james-project] 12/15: [PERF] Improve Cassandra rows interpretation in mailbox/cassandra
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 64c6fb24a4d2be945549975495f5190265f7fda0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 2 12:56:18 2022 +0700
[PERF] Improve Cassandra rows interpretation in mailbox/cassandra
---
.../mailbox/cassandra/mail/CassandraACLDAOV2.java | 3 ++-
.../cassandra/mail/CassandraDeletedMessageDAO.java | 7 +++---
.../cassandra/mail/CassandraFirstUnseenDAO.java | 5 ++--
.../cassandra/mail/CassandraMailboxCounterDAO.java | 3 ++-
.../cassandra/mail/CassandraMailboxDAO.java | 16 ++++++++----
.../cassandra/mail/CassandraMailboxPathV3DAO.java | 17 +++++++------
.../cassandra/mail/CassandraMailboxRecentsDAO.java | 5 ++--
.../cassandra/mail/CassandraMessageDAOV3.java | 29 ++++++++++++----------
.../cassandra/mail/CassandraMessageIdDAO.java | 28 ++++++++++-----------
.../mail/CassandraMessageIdToImapUidDAO.java | 17 +++++++------
.../cassandra/mail/CassandraModSeqProvider.java | 9 ++++---
.../mailbox/cassandra/mail/CassandraThreadDAO.java | 19 +++++++-------
.../cassandra/mail/CassandraThreadLookupDAO.java | 19 +++++++++-----
.../cassandra/mail/CassandraUidProvider.java | 7 +++---
14 files changed, 105 insertions(+), 79 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
index 2958a7602b..86fbd1da50 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLDAOV2.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.model.MailboxACL;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -106,7 +107,7 @@ public class CassandraACLDAOV2 {
public Mono<MailboxACL> getACL(CassandraId cassandraId) {
return executor.executeRows(
read.bind()
- .setUuid(CassandraACLTable.ID, cassandraId.asUuid()))
+ .set(CassandraACLTable.ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID))
.map(Throwing.function(row -> {
MailboxACL.EntryKey entryKey = MailboxACL.EntryKey.deserialize(row.getString(CassandraACLV2Table.KEY));
MailboxACL.Rfc4314Rights rights = row.getSet(CassandraACLV2Table.RIGHTS, String.class)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 5aa632e956..ee4624d847 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -44,6 +44,7 @@ import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.google.common.collect.Lists;
import reactor.core.publisher.Flux;
@@ -186,12 +187,12 @@ public class CassandraDeletedMessageDAO {
public Mono<Void> removeAll(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
- .setUuid(MAILBOX_ID, cassandraId.asUuid()));
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID));
}
public Flux<MessageUid> retrieveDeletedMessage(CassandraId cassandraId, MessageRange range) {
return retrieveResultSetOfDeletedMessage(cassandraId, range)
- .map(row -> MessageUid.of(row.getLong(UID)));
+ .map(row -> MessageUid.of(row.getLong(0)));
}
private Flux<Row> retrieveResultSetOfDeletedMessage(CassandraId cassandraId, MessageRange range) {
@@ -212,7 +213,7 @@ public class CassandraDeletedMessageDAO {
private Flux<Row> retrieveAllDeleted(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeRows(
selectAllUidStatement.bind()
- .setUuid(MAILBOX_ID, cassandraId.asUuid()));
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID));
}
private Flux<Row> retrieveOneDeleted(CassandraId cassandraId, MessageUid uid) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index 9f3f5d78d2..bd5eeb2aff 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -43,6 +43,7 @@ import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.google.common.collect.Lists;
import reactor.core.publisher.Flux;
@@ -177,8 +178,8 @@ public class CassandraFirstUnseenDAO {
public Flux<MessageUid> listUnseen(CassandraId cassandraId) {
return cassandraAsyncExecutor.executeRows(
listStatement.bind()
- .setUuid(MAILBOX_ID, cassandraId.asUuid()))
- .map(row -> MessageUid.of(row.getLong(UID)));
+ .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID))
+ .map(row -> MessageUid.of(row.getLong(0)));
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index eff494fee9..d84d67c8e6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -41,6 +41,7 @@ import org.apache.james.mailbox.model.MailboxCounters;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.querybuilder.update.Assignment;
import reactor.core.publisher.Mono;
@@ -208,6 +209,6 @@ public class CassandraMailboxCounterDAO {
private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) {
return statement.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid());
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID);
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 0740401d7b..79dd21da8f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -52,6 +52,9 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.UdtValue;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
+import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -67,6 +70,7 @@ public class CassandraMailboxDAO {
private final PreparedStatement updateUidValidityStatement;
private final CqlSession session;
private final DriverExecutionProfile lwtProfile;
+ private final TypeCodec<UdtValue> mailboxBaseTypeCodec;
@Inject
public CassandraMailboxDAO(CqlSession session, CassandraTypesProvider typesProvider) {
@@ -80,6 +84,8 @@ public class CassandraMailboxDAO {
this.listStatement = prepareList();
this.readStatement = prepareRead();
this.lwtProfile = JamesExecutionProfiles.getLWTProfile(session);
+
+ this.mailboxBaseTypeCodec = CodecRegistry.DEFAULT.codecFor(typesProvider.getDefinedUserType(MAILBOX_BASE.asCql(true)));
}
private PreparedStatement prepareInsert() {
@@ -148,7 +154,7 @@ public class CassandraMailboxDAO {
public Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) {
return executor.executeSingleRow(readStatement.bind()
- .setUuid(ID, mailboxId.asUuid())
+ .set(ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setExecutionProfile(lwtProfile))
.flatMap(row -> mailboxFromRow(row, mailboxId));
}
@@ -156,12 +162,12 @@ public class CassandraMailboxDAO {
private Mono<Mailbox> mailboxFromRow(Row row, CassandraId cassandraId) {
return sanitizeUidValidity(cassandraId, row.getLong(UIDVALIDITY))
.map(uidValidity -> {
- UdtValue mailboxBase = row.getUdtValue(MAILBOX_BASE);
+ UdtValue mailboxBase = row.get(MAILBOX_BASE, mailboxBaseTypeCodec);
return new Mailbox(
new MailboxPath(
- mailboxBase.getString(CassandraMailboxTable.MailboxBase.NAMESPACE),
- Username.of(mailboxBase.getString(CassandraMailboxTable.MailboxBase.USER)),
- row.getString(NAME)),
+ mailboxBase.get(CassandraMailboxTable.MailboxBase.NAMESPACE, TypeCodecs.TEXT),
+ Username.of(mailboxBase.get(CassandraMailboxTable.MailboxBase.USER, TypeCodecs.TEXT)),
+ row.get(NAME, TypeCodecs.TEXT)),
uidValidity,
cassandraId);
});
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
index fdb4f76865..b2c67991ee 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
@@ -53,6 +53,7 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -128,9 +129,9 @@ public class CassandraMailboxPathV3DAO {
public Mono<Mailbox> retrieve(MailboxPath mailboxPath, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
BoundStatement statement = select.bind()
- .setString(NAMESPACE, mailboxPath.getNamespace())
- .setString(USER, sanitizeUser(mailboxPath.getUser()))
- .setString(MAILBOX_NAME, mailboxPath.getName());
+ .set(NAMESPACE, mailboxPath.getNamespace(), TypeCodecs.TEXT)
+ .set(USER, sanitizeUser(mailboxPath.getUser()), TypeCodecs.TEXT)
+ .set(MAILBOX_NAME, mailboxPath.getName(), TypeCodecs.TEXT);
return cassandraAsyncExecutor.executeSingleRow(setExecutionProfileIfNeeded(statement, consistencyChoice))
.map(row -> fromRow(row, mailboxPath.getUser(), mailboxPath.getNamespace(), mailboxPath.getName()))
@@ -140,8 +141,8 @@ public class CassandraMailboxPathV3DAO {
public Flux<Mailbox> listUserMailboxes(String namespace, Username user, JamesExecutionProfiles.ConsistencyChoice consistencyChoice) {
BoundStatementBuilder statementBuilder = selectUser.boundStatementBuilder()
- .setString(NAMESPACE, namespace)
- .setString(USER, sanitizeUser(user));
+ .set(NAMESPACE, namespace, TypeCodecs.TEXT)
+ .set(USER, sanitizeUser(user), TypeCodecs.TEXT);
if (consistencyChoice.equals(STRONG)) {
statementBuilder.setExecutionProfile(lwtProfile);
@@ -194,11 +195,11 @@ public class CassandraMailboxPathV3DAO {
}
private Mailbox fromRowToCassandraIdAndPath(Row row) {
- return fromRow(row, Username.of(row.getString(USER)), row.getString(NAMESPACE));
+ return fromRow(row, Username.of(row.get(USER, TypeCodecs.TEXT)), row.get(NAMESPACE, TypeCodecs.TEXT));
}
private Mailbox fromRow(Row row, Username username, String namespace) {
- return fromRow(row, username, namespace, row.getString(MAILBOX_NAME));
+ return fromRow(row, username, namespace, row.get(MAILBOX_NAME, TypeCodecs.TEXT));
}
private Mailbox fromRow(Row row, Username username, String namespace, String name) {
@@ -207,7 +208,7 @@ public class CassandraMailboxPathV3DAO {
username,
name),
UidValidity.of(row.getLong(UIDVALIDITY)),
- CassandraId.of(row.getUuid(MAILBOX_ID)));
+ CassandraId.of(row.get(MAILBOX_ID, TypeCodecs.TIMEUUID)));
}
public Mono<Boolean> save(Mailbox mailbox) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index 1d800e6510..da5552eaa5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -41,6 +41,7 @@ import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.google.common.collect.Lists;
import reactor.core.publisher.Flux;
@@ -98,13 +99,13 @@ public class CassandraMailboxRecentsDAO {
public Flux<MessageUid> getRecentMessageUidsInMailbox(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeRows(bindWithMailbox(mailboxId, readStatement))
- .map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))
+ .map(row -> row.getLong(0))
.map(MessageUid::of);
}
private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) {
return statement.bind()
- .setUuid(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid());
+ .set(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID);
}
public Mono<Void> removeFromRecent(CassandraId mailboxId, MessageUid messageUid) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 3b2ea9b4e4..ebe0675bcc 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -120,6 +120,7 @@ public class CassandraMessageDAOV3 {
private final Cid.CidParser cidParser;
private final DriverExecutionProfile lwtProfile;
private final UserDefinedType attachmentsType;
+ private final TypeCodec<List<UdtValue>> attachmentCodec;
@Inject
public CassandraMessageDAOV3(CqlSession session, CassandraTypesProvider typesProvider, BlobStore blobStore,
@@ -135,6 +136,7 @@ public class CassandraMessageDAOV3 {
this.listBlobs = prepareSelectBlobs(session);
this.cidParser = Cid.parser().relaxed();
this.attachmentsType = typesProvider.getDefinedUserType(ATTACHMENTS.asCql(true));
+ this.attachmentCodec = CodecRegistry.DEFAULT.codecFor(listOf(attachmentsType));
}
private PreparedStatement prepareSelect(CqlSession session) {
@@ -356,14 +358,14 @@ public class CassandraMessageDAOV3 {
private Properties getProperties(Row row) {
PropertyBuilder property = new PropertyBuilder();
- property.setContentDescription(row.getString(CONTENT_DESCRIPTION));
- property.setContentDispositionType(row.getString(CONTENT_DISPOSITION_TYPE));
- property.setMediaType(row.getString(MEDIA_TYPE));
- property.setSubType(row.getString(SUB_TYPE));
- property.setContentID(row.getString(CONTENT_ID));
- property.setContentMD5(row.getString(CONTENT_MD5));
- property.setContentTransferEncoding(row.getString(CONTENT_TRANSFER_ENCODING));
- property.setContentLocation(row.getString(CONTENT_LOCATION));
+ property.setContentDescription(row.get(CONTENT_DESCRIPTION, TypeCodecs.TEXT));
+ property.setContentDispositionType(row.get(CONTENT_DISPOSITION_TYPE, TypeCodecs.TEXT));
+ property.setMediaType(row.get(MEDIA_TYPE, TypeCodecs.TEXT));
+ property.setSubType(row.get(SUB_TYPE, TypeCodecs.TEXT));
+ property.setContentID(row.get(CONTENT_ID, TypeCodecs.TEXT));
+ property.setContentMD5(row.get(CONTENT_MD5, TypeCodecs.TEXT));
+ property.setContentTransferEncoding(row.get(CONTENT_TRANSFER_ENCODING, TypeCodecs.TEXT));
+ property.setContentLocation(row.get(CONTENT_LOCATION, TypeCodecs.TEXT));
property.setContentLanguage(row.get(CONTENT_LANGUAGE, LIST_OF_STRINGS_CODEC));
property.setContentDispositionParameters(row.get(CONTENT_DISPOSITION_PARAMETERS, MAP_OF_STRINGS_CODEC));
property.setContentTypeParameters(row.get(CONTENT_TYPE_PARAMETERS, MAP_OF_STRINGS_CODEC));
@@ -372,8 +374,9 @@ public class CassandraMessageDAOV3 {
}
private Stream<MessageAttachmentRepresentation> getAttachments(Row row) {
- List<UdtValue> udtValues = Optional.<List<UdtValue>>ofNullable(row.get(ATTACHMENTS, CodecRegistry.DEFAULT.codecFor(listOf(attachmentsType)))).orElse(List.of());
- return attachmentByIds(udtValues);
+ return Optional.ofNullable(row.get(ATTACHMENTS, attachmentCodec))
+ .map(this::attachmentByIds)
+ .orElseGet(Stream::of);
}
private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UdtValue> udtValues) {
@@ -383,9 +386,9 @@ public class CassandraMessageDAOV3 {
private MessageAttachmentRepresentation messageAttachmentByIdFrom(UdtValue udtValue) {
return MessageAttachmentRepresentation.builder()
- .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
- .name(udtValue.getString(Attachments.NAME))
- .cid(cidParser.parse(udtValue.getString(Attachments.CID)))
+ .attachmentId(AttachmentId.from(udtValue.get(Attachments.ID, TypeCodecs.TEXT)))
+ .name(udtValue.get(Attachments.NAME, TypeCodecs.TEXT))
+ .cid(cidParser.parse(udtValue.get(Attachments.CID, TypeCodecs.TEXT)))
.isInline(udtValue.getBoolean(Attachments.IS_INLINE))
.build();
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index edb99eb80c..9382e7a62a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -295,7 +295,7 @@ public class CassandraMessageIdDAO {
public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) {
return cassandraAsyncExecutor.executeVoid(delete.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, uid.asLong()));
}
@@ -313,7 +313,7 @@ public class CassandraMessageIdDAO {
statementBuilder.setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()), String.class);
}
return cassandraAsyncExecutor.executeVoid(statementBuilder
- .setUuid(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+ .set(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, composedMessageId.getUid().asLong())
.setUuid(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
.setUuid(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get())
@@ -408,7 +408,7 @@ public class CassandraMessageIdDAO {
private Mono<Row> selectOneRow(CassandraId mailboxId, MessageUid uid) {
return cassandraAsyncExecutor.executeSingleRow(select.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, uid.asLong()));
}
@@ -490,32 +490,32 @@ public class CassandraMessageIdDAO {
private Flux<Row> selectAll(CassandraId mailboxId, Limit limit) {
return cassandraAsyncExecutor.executeRows(limit.getLimit()
.map(limitAsInt -> selectAllLimited.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setInt(LIMIT, limitAsInt))
.orElseGet(() -> selectAll.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())));
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)));
}
private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid, Limit limit) {
return cassandraAsyncExecutor.executeRows(limit.getLimit()
.map(limitAsInt -> selectUidGteLimited.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, uid.asLong())
.setInt(LIMIT, limitAsInt))
.orElseGet(() -> selectUidGte.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, uid.asLong())));
}
private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, MessageUid to, Limit limit) {
return cassandraAsyncExecutor.executeRows(limit.getLimit()
.map(limitAsInt -> selectUidRangeLimited.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, from.asLong())
.setLong(IMAP_UID_LTE, to.asLong())
.setInt(LIMIT, limitAsInt))
.orElseGet(() -> selectUidRange.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID_GTE, from.asLong())
.setLong(IMAP_UID_LTE, to.asLong())));
}
@@ -525,7 +525,7 @@ public class CassandraMessageIdDAO {
if (rowAsUuid == null) {
// Out of order updates with concurrent deletes can result in the row being partially deleted
// We filter out such records, and cleanup them.
- delete(CassandraId.of(row.getUuid(MAILBOX_ID)),
+ delete(CassandraId.of(row.get(MAILBOX_ID, TypeCodecs.TIMEUUID)),
MessageUid.of(row.getLong(IMAP_UID)))
.subscribeOn(Schedulers.parallel())
.subscribe();
@@ -535,7 +535,7 @@ public class CassandraMessageIdDAO {
return Optional.of(CassandraMessageMetadata.builder()
.ids(ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(
- CassandraId.of(row.getUuid(MAILBOX_ID)),
+ CassandraId.of(row.get(MAILBOX_ID, TypeCodecs.TIMEUUID)),
messageId,
MessageUid.of(row.getLong(IMAP_UID))))
.flags(FlagsExtractor.getFlags(row))
@@ -543,16 +543,16 @@ public class CassandraMessageIdDAO {
.threadId(getThreadIdFromRow(row, messageId))
.build())
.bodyStartOctet(row.get(BODY_START_OCTET, Integer.class))
- .internalDate(Optional.ofNullable(row.getInstant(INTERNAL_DATE))
+ .internalDate(Optional.ofNullable(row.get(INTERNAL_DATE, TypeCodecs.TIMESTAMP))
.map(Date::from))
.size(row.get(FULL_CONTENT_OCTETS, Long.class))
- .headerContent(Optional.ofNullable(row.getString(HEADER_CONTENT))
+ .headerContent(Optional.ofNullable(row.get(HEADER_CONTENT, TypeCodecs.TEXT))
.map(blobIdFactory::from))
.build());
}
private ThreadId getThreadIdFromRow(Row row, MessageId messageId) {
- UUID threadIdUUID = row.getUuid(THREAD_ID);
+ UUID threadIdUUID = row.get(THREAD_ID, TypeCodecs.TIMEUUID);
if (threadIdUUID == null) {
return ThreadId.fromBaseMessageId(messageId);
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 824f7f3038..19d450e1ac 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -75,6 +75,7 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.insert.Insert;
import com.datastax.oss.driver.api.querybuilder.update.Update;
@@ -253,8 +254,8 @@ public class CassandraMessageIdToImapUidDAO {
}
return cassandraAsyncExecutor.executeVoid(statementBuilder
- .setUuid(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
- .setUuid(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+ .set(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID)
+ .set(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, composedMessageId.getUid().asLong())
.setLong(MOD_SEQ, metadata.getComposedMessageId().getModSeq().asLong())
.setUuid(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get())
@@ -276,8 +277,8 @@ public class CassandraMessageIdToImapUidDAO {
ComposedMessageId composedMessageId = metadata.getComposedMessageId().getComposedMessageId();
Flags flags = metadata.getComposedMessageId().getFlags();
return cassandraAsyncExecutor.executeVoid(insertForced.bind()
- .setUuid(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
- .setUuid(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+ .set(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID)
+ .set(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, composedMessageId.getUid().asLong())
.setLong(MOD_SEQ, metadata.getComposedMessageId().getModSeq().asLong())
.setBoolean(ANSWERED, flags.contains(Flag.ANSWERED))
@@ -395,7 +396,7 @@ public class CassandraMessageIdToImapUidDAO {
.modSeq(ModSeq.of(row.getLong(MOD_SEQ)))
.build())
.bodyStartOctet(row.get(BODY_START_OCTET, Integer.class))
- .internalDate(Optional.ofNullable(row.getInstant(INTERNAL_DATE))
+ .internalDate(Optional.ofNullable(row.get(INTERNAL_DATE, TypeCodecs.TIMESTAMP))
.map(Date::from))
.size(row.get(FULL_CONTENT_OCTETS, Long.class))
.headerContent(Optional.ofNullable(row.getString(HEADER_CONTENT))
@@ -404,7 +405,7 @@ public class CassandraMessageIdToImapUidDAO {
}
private ThreadId getThreadIdFromRow(Row row, MessageId messageId) {
- UUID threadIdUUID = row.getUuid(THREAD_ID);
+ UUID threadIdUUID = row.get(THREAD_ID, TypeCodecs.TIMEUUID);
if (threadIdUUID == null) {
return ThreadId.fromBaseMessageId(messageId);
}
@@ -441,8 +442,8 @@ public class CassandraMessageIdToImapUidDAO {
}
return cassandraAsyncExecutor.executeVoid(statementBuilder
- .setUuid(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
- .setUuid(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+ .set(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get(), TypeCodecs.TIMEUUID)
+ .set(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid(), TypeCodecs.TIMEUUID)
.setLong(IMAP_UID, composedMessageId.getUid().asLong())
.setLong(MOD_SEQ, metadata.getComposedMessageId().getModSeq().asLong())
.setUuid(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get())
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index 3c7fbdf6b1..dd31a7bdcf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -49,6 +49,7 @@ import org.apache.james.mailbox.store.mail.ModSeqProvider;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -152,16 +153,16 @@ public class CassandraModSeqProvider implements ModSeqProvider {
private Mono<Optional<ModSeq>> findHighestModSeq(CassandraId mailboxId) {
return cassandraAsyncExecutor.executeSingleRowOptional(
select.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setExecutionProfile(lwtProfile))
- .map(maybeRow -> maybeRow.map(row -> ModSeq.of(row.getLong(NEXT_MODSEQ))));
+ .map(maybeRow -> maybeRow.map(row -> ModSeq.of(row.getLong(0))));
}
private Mono<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
ModSeq nextModSeq = modSeq.next();
return cassandraAsyncExecutor.executeReturnApplied(
insert.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(NEXT_MODSEQ, nextModSeq.asLong()))
.map(success -> successToModSeq(nextModSeq, success))
.handle(publishIfPresent());
@@ -171,7 +172,7 @@ public class CassandraModSeqProvider implements ModSeqProvider {
ModSeq nextModSeq = modSeq.next();
return cassandraAsyncExecutor.executeReturnApplied(
update.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(NEXT_MODSEQ, nextModSeq.asLong())
.setLong(MOD_SEQ_CONDITION, modSeq.asLong()))
.map(success -> successToModSeq(nextModSeq, success))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
index 8da46e864d..5af4b990c1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
@@ -49,6 +49,7 @@ import org.apache.james.mailbox.store.mail.model.Subject;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import reactor.core.publisher.Flux;
@@ -85,19 +86,19 @@ public class CassandraThreadDAO {
public Flux<Void> insertSome(Username username, Set<MimeMessageId> mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> baseSubject) {
return Flux.fromIterable(mimeMessageIds)
.flatMap(mimeMessageId -> executor.executeVoid(insertOne.bind()
- .setString(USERNAME, username.asString())
- .setString(MIME_MESSAGE_ID, mimeMessageId.getValue())
- .setUuid(MESSAGE_ID, ((CassandraMessageId) messageId).get())
- .setUuid(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get())
- .setString(BASE_SUBJECT, baseSubject.map(Subject::getValue).orElse(null))), DEFAULT_CONCURRENCY);
+ .set(USERNAME, username.asString(), TypeCodecs.TEXT)
+ .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT)
+ .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID)
+ .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
+ .set(BASE_SUBJECT, baseSubject.map(Subject::getValue).orElse(null), TypeCodecs.TEXT)), DEFAULT_CONCURRENCY);
}
public Flux<Pair<Optional<Subject>, ThreadId>> selectSome(Username username, Set<MimeMessageId> mimeMessageIds) {
return Flux.fromIterable(mimeMessageIds)
.flatMap(mimeMessageId -> executor
.executeSingleRow(selectOne.bind()
- .setString(USERNAME, username.asString())
- .setString(MIME_MESSAGE_ID, mimeMessageId.getValue()))
+ .set(USERNAME, username.asString(), TypeCodecs.TEXT)
+ .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT))
.map(this::readRow), DEFAULT_CONCURRENCY)
.distinct();
}
@@ -105,8 +106,8 @@ public class CassandraThreadDAO {
public Flux<Void> deleteSome(Username username, Set<MimeMessageId> mimeMessageIds) {
return Flux.fromIterable(mimeMessageIds)
.flatMap(mimeMessageId -> executor.executeVoid(deleteOne.bind()
- .setString(USERNAME, username.asString())
- .setString(MIME_MESSAGE_ID, mimeMessageId.getValue())));
+ .set(USERNAME, username.asString(), TypeCodecs.TEXT)
+ .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT)));
}
public Pair<Optional<Subject>, ThreadId> readRow(Row row) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
index ff6e96a04b..5913c0622b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -19,6 +19,8 @@
package org.apache.james.mailbox.cassandra.mail;
+import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
+import static com.datastax.oss.driver.api.core.type.DataTypes.frozenSetOf;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
@@ -42,11 +44,16 @@ import org.apache.james.mailbox.store.mail.model.MimeMessageId;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
+import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
public class CassandraThreadLookupDAO {
+ private static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = CodecRegistry.DEFAULT.codecFor(frozenSetOf(TEXT));
+
private final CassandraAsyncExecutor executor;
private final PreparedStatement insert;
private final PreparedStatement select;
@@ -75,24 +82,24 @@ public class CassandraThreadLookupDAO {
public Mono<Void> insert(MessageId messageId, Username username, Set<MimeMessageId> mimeMessageIds) {
Set<String> mimeMessageIdsString = mimeMessageIds.stream().map(MimeMessageId::getValue).collect(ImmutableSet.toImmutableSet());
return executor.executeVoid(insert.bind()
- .setUuid(MESSAGE_ID, ((CassandraMessageId) messageId).get())
- .setString(USERNAME, username.asString())
- .setSet(MIME_MESSAGE_IDS, mimeMessageIdsString, String.class));
+ .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID)
+ .set(USERNAME, username.asString(), TypeCodecs.TEXT)
+ .set(MIME_MESSAGE_IDS, mimeMessageIdsString, SET_OF_STRINGS_CODEC));
}
public Mono<ThreadTablePartitionKey> selectOneRow(MessageId messageId) {
return executor.executeSingleRow(
- select.bind().setUuid(MESSAGE_ID, ((CassandraMessageId) messageId).get()))
+ select.bind().set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID))
.map(this::readRow);
}
public Mono<Void> deleteOneRow(MessageId messageId) {
return executor.executeVoid(delete.bind()
- .setUuid(MESSAGE_ID, ((CassandraMessageId) messageId).get()));
+ .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID));
}
private ThreadTablePartitionKey readRow(Row row) {
- Set<MimeMessageId> mimeMessageIds = row.getSet(MIME_MESSAGE_IDS, String.class)
+ Set<MimeMessageId> mimeMessageIds = row.get(MIME_MESSAGE_IDS, SET_OF_STRINGS_CODEC)
.stream()
.map(MimeMessageId::new)
.collect(ImmutableSet.toImmutableSet());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index ef4eae061d..570ed05f9b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -49,6 +49,7 @@ import org.apache.james.mailbox.store.mail.UidProvider;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
@@ -165,9 +166,9 @@ public class CassandraUidProvider implements UidProvider {
private Mono<MessageUid> findHighestUid(CassandraId mailboxId) {
return executor.executeSingleRow(
selectStatement.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setExecutionProfile(lwtProfile))
- .map(row -> MessageUid.of(row.getLong(NEXT_UID)));
+ .map(row -> MessageUid.of(row.getLong(0)));
}
private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid) {
@@ -178,7 +179,7 @@ public class CassandraUidProvider implements UidProvider {
MessageUid nextUid = uid.next(count);
return executor.executeReturnApplied(
updateStatement.bind()
- .setUuid(MAILBOX_ID, mailboxId.asUuid())
+ .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.TIMEUUID)
.setLong(CONDITION, uid.asLong())
.setLong(NEXT_UID, nextUid.asLong()))
.map(success -> successToUid(nextUid, success))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org