You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:45 UTC
[james-project] 11/16: [REFACTORING] Simplify Cassandra Message POJO
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cf243e0f46c721eb0798bd756e03fdfa546b88b8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 12 11:03:33 2020 +0700
[REFACTORING] Simplify Cassandra Message POJO
MessageResult POJO is confusing as it holds the same name than a store POJO.
It's concept "not found" can efficiently be represented with reactor emptiness.
Thus we can "push" the attachmentRepresentation in the MessageRepresentation type
to keep a single unified POJO instead of a Pair, simplifying signature of the corresponding methods.
---
.../mailbox/cassandra/mail/AttachmentLoader.java | 7 +--
.../cassandra/mail/CassandraMessageDAO.java | 69 ++++++----------------
.../cassandra/mail/CassandraMessageIdMapper.java | 2 -
.../cassandra/mail/CassandraMessageMapper.java | 8 +--
...tAttachment.java => MessageRepresentation.java} | 14 +++--
.../cassandra/mail/CassandraMessageDAOTest.java | 15 ++---
6 files changed, 39 insertions(+), 76 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index ab281d0..c458fae 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.List;
import java.util.stream.Stream;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.mailbox.model.Attachment;
import org.apache.james.mailbox.model.MessageAttachment;
import org.apache.james.mailbox.store.mail.MessageMapper;
@@ -42,9 +41,9 @@ public class AttachmentLoader {
this.attachmentMapper = attachmentMapper;
}
- public Mono<MailboxMessage> addAttachmentToMessage(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageRepresentation, MessageMapper.FetchType fetchType) {
- return loadAttachments(messageRepresentation.getRight(), fetchType)
- .map(attachments -> messageRepresentation.getLeft().toMailboxMessage(attachments));
+ public Mono<MailboxMessage> addAttachmentToMessage(MessageRepresentation messageRepresentation, MessageMapper.FetchType fetchType) {
+ return loadAttachments(messageRepresentation.getAttachments().stream(), fetchType)
+ .map(messageRepresentation::toMailboxMessage);
}
private Mono<List<MessageAttachment>> loadAttachments(Stream<MessageAttachmentRepresentation> messageAttachmentRepresentations, MessageMapper.FetchType fetchType) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index d861ab9..fc92869 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -54,7 +54,6 @@ import javax.inject.Inject;
import javax.mail.util.SharedByteArrayInputStream;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -237,12 +236,12 @@ public class CassandraMessageDAO {
.collect(Guavate.toImmutableList());
}
- public Mono<MessageResult> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
+ public Mono<MessageRepresentation> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
return retrieveRow(id, fetchType)
.flatMap(resultSet -> message(resultSet, id, fetchType));
}
- public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+ public Flux<MessageRepresentation> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
.publishOn(Schedulers.elastic())
.flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize());
@@ -257,31 +256,29 @@ public class CassandraMessageDAO {
.setConsistencyLevel(QUORUM));
}
- private Mono<MessageResult>
+ private Mono<MessageRepresentation>
message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
if (rows.isExhausted()) {
- return Mono.just(notFound(messageIdWithMetaData));
+ return Mono.empty();
}
Row row = rows.one();
- return buildContentRetriever(fetchType, row).map(content -> {
- MessageWithoutAttachment messageWithoutAttachment =
- new MessageWithoutAttachment(
- messageId.getMessageId(),
- row.getTimestamp(INTERNAL_DATE),
- row.getLong(FULL_CONTENT_OCTETS),
- row.getInt(BODY_START_OCTET),
- new SharedByteArrayInputStream(content),
- messageIdWithMetaData.getFlags(),
- getPropertyBuilder(row),
- messageId.getMailboxId(),
- messageId.getUid(),
- messageIdWithMetaData.getModSeq(),
- hasAttachment(row));
- return found(Pair.of(messageWithoutAttachment, getAttachments(row)));
- });
+ return buildContentRetriever(fetchType, row).map(content ->
+ new MessageRepresentation(
+ messageId.getMessageId(),
+ row.getTimestamp(INTERNAL_DATE),
+ row.getLong(FULL_CONTENT_OCTETS),
+ row.getInt(BODY_START_OCTET),
+ new SharedByteArrayInputStream(content),
+ messageIdWithMetaData.getFlags(),
+ getPropertyBuilder(row),
+ messageId.getMailboxId(),
+ messageId.getUid(),
+ messageIdWithMetaData.getModSeq(),
+ hasAttachment(row),
+ getAttachments(row).collect(Guavate.toImmutableList())));
}
private PropertyBuilder getPropertyBuilder(Row row) {
@@ -374,36 +371,6 @@ public class CassandraMessageDAO {
return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field))));
}
- public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
- return new MessageResult(id, Optional.empty());
- }
-
- public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) {
- return new MessageResult(message.getLeft().getMetadata(), Optional.of(message));
- }
-
- public static class MessageResult {
- private final ComposedMessageIdWithMetaData metaData;
- private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message;
-
- public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) {
- this.metaData = metaData;
- this.message = message;
- }
-
- public ComposedMessageIdWithMetaData getMetadata() {
- return metaData;
- }
-
- public boolean isFound() {
- return message.isPresent();
- }
-
- public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() {
- return message.get();
- }
- }
-
public Flux<MessageIdAttachmentIds> retrieveAllMessageIdAttachmentIds() {
return cassandraAsyncExecutor.executeRows(
selectAllMessagesWithAttachment.bind()
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 8e07fda..26caba1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -96,8 +96,6 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.publishOn(Schedulers.elastic())
.flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
.flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize())
- .filter(CassandraMessageDAO.MessageResult::isFound)
- .map(CassandraMessageDAO.MessageResult::message)
.flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.groupBy(MailboxMessage::getMailboxId)
.flatMap(this::keepMessageIfMailboxExists)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 7f7a0b7..b7e6ed3 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -177,9 +177,7 @@ public class CassandraMessageMapper implements MessageMapper {
private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
return messageDAO.retrieveMessages(messageIds, fetchType, limit)
- .filter(CassandraMessageDAO.MessageResult::isFound)
- .map(CassandraMessageDAO.MessageResult::message)
- .flatMap(stream -> attachmentLoader.addAttachmentToMessage(stream, fetchType));
+ .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType));
}
@Override
@@ -221,9 +219,7 @@ public class CassandraMessageMapper implements MessageMapper {
return retrieveComposedId(mailboxId, messageUid)
.flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
.flatMapMany(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata))
- .filter(CassandraMessageDAO.MessageResult::isFound)
- .map(CassandraMessageDAO.MessageResult::message)
- .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
+ .map(pair -> pair.toMailboxMessage(ImmutableList.of()));
}
private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
similarity index 85%
rename from mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java
rename to mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
index 618ebb5..728f0d2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageWithoutAttachment.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
@@ -35,7 +35,7 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-public class MessageWithoutAttachment {
+public class MessageRepresentation {
private final MessageId messageId;
private final Date internalDate;
private final Long size;
@@ -47,10 +47,11 @@ public class MessageWithoutAttachment {
private final MessageUid messageUid;
private final ModSeq modSeq;
private final boolean hasAttachment;
+ private final List<MessageAttachmentRepresentation> attachments;
- public MessageWithoutAttachment(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
- Flags flags, PropertyBuilder propertyBuilder, MailboxId mailboxId, MessageUid messageUid, ModSeq modSeq,
- boolean hasAttachment) {
+ public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
+ Flags flags, PropertyBuilder propertyBuilder, MailboxId mailboxId, MessageUid messageUid, ModSeq modSeq,
+ boolean hasAttachment, List<MessageAttachmentRepresentation> attachments) {
this.messageId = messageId;
this.internalDate = internalDate;
this.size = size;
@@ -62,6 +63,7 @@ public class MessageWithoutAttachment {
this.messageUid = messageUid;
this.modSeq = modSeq;
this.hasAttachment = hasAttachment;
+ this.attachments = attachments;
}
public SimpleMailboxMessage toMailboxMessage(List<MessageAttachment> attachments) {
@@ -100,4 +102,8 @@ public class MessageWithoutAttachment {
public PropertyBuilder getPropertyBuilder() {
return propertyBuilder;
}
+
+ public List<MessageAttachmentRepresentation> getAttachments() {
+ return attachments;
+ }
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
index bec5100..daf4767 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -32,7 +32,6 @@ import javax.mail.Flags;
import javax.mail.util.SharedByteArrayInputStream;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -111,7 +110,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
- MessageWithoutAttachment attachmentRepresentation =
+ MessageRepresentation attachmentRepresentation =
toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
@@ -127,7 +126,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
- MessageWithoutAttachment attachmentRepresentation =
+ MessageRepresentation attachmentRepresentation =
toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
@@ -139,7 +138,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
- MessageWithoutAttachment attachmentRepresentation =
+ MessageRepresentation attachmentRepresentation =
toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
@@ -152,7 +151,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
- MessageWithoutAttachment attachmentRepresentation =
+ MessageRepresentation attachmentRepresentation =
toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
byte[] expected = Bytes.concat(
@@ -168,7 +167,7 @@ class CassandraMessageDAOTest {
testee.save(message).block();
- MessageWithoutAttachment attachmentRepresentation =
+ MessageRepresentation attachmentRepresentation =
toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
assertThat(IOUtils.toString(attachmentRepresentation.getContent(), StandardCharsets.UTF_8))
@@ -190,10 +189,8 @@ class CassandraMessageDAOTest {
.build();
}
- private MessageWithoutAttachment toMessage(Flux<CassandraMessageDAO.MessageResult> read) {
+ private MessageRepresentation toMessage(Flux<MessageRepresentation> read) {
return read.toStream()
- .map(CassandraMessageDAO.MessageResult::message)
- .map(Pair::getLeft)
.findAny()
.orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org