You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/08/22 03:31:59 UTC
[4/9] james-project git commit: JAMES-2111
s/CassandraMessageDAOV2/CassandraMessageDAO/g
JAMES-2111 s/CassandraMessageDAOV2/CassandraMessageDAO/g
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9360cb3a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9360cb3a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9360cb3a
Branch: refs/heads/master
Commit: 9360cb3a3ad6c0543c036beceb7b608d1f6f637d
Parents: 51a1311
Author: benwa <bt...@linagora.com>
Authored: Tue Aug 15 15:55:56 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 22 10:31:07 2017 +0700
----------------------------------------------------------------------
.../CassandraMailboxSessionMapperFactory.java | 16 +-
.../cassandra/mail/CassandraMessageDAO.java | 398 +++++++++++++++++++
.../cassandra/mail/CassandraMessageDAOV2.java | 398 -------------------
.../mail/CassandraMessageIdMapper.java | 14 +-
.../cassandra/mail/CassandraMessageMapper.java | 20 +-
.../CassandraMailboxManagerProvider.java | 6 +-
.../CassandraSubscriptionManagerTest.java | 6 +-
.../cassandra/CassandraTestSystemFixture.java | 6 +-
.../CassandraMailboxManagerAttachmentTest.java | 4 +-
.../cassandra/mail/CassandraMapperProvider.java | 5 +-
.../cassandra/mail/CassandraMessageDAOTest.java | 191 +++++++++
.../mail/CassandraMessageDAOV2Test.java | 191 ---------
.../cassandra/host/CassandraHostSystem.java | 6 +-
.../modules/mailbox/CassandraMailboxModule.java | 4 +-
14 files changed, 632 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index e02d5cd..020e6a1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -35,7 +35,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
@@ -63,7 +63,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
private final Session session;
private final CassandraUidProvider uidProvider;
private final CassandraModSeqProvider modSeqProvider;
- private final CassandraMessageDAOV2 messageDAOV2;
+ private final CassandraMessageDAO messageDAO;
private final CassandraMessageIdDAO messageIdDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMailboxCounterDAO mailboxCounterDAO;
@@ -79,7 +79,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
@Inject
public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
- CassandraMessageDAOV2 messageDAOV2,
+ CassandraMessageDAO messageDAO,
CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, CassandraMailboxDAO mailboxDAO,
CassandraMailboxPathDAO mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, CassandraApplicableFlagDAO applicableFlagDAO,
@@ -87,7 +87,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
this.session = session;
- this.messageDAOV2 = messageDAOV2;
+ this.messageDAO = messageDAO;
this.messageIdDAO = messageIdDAO;
this.imapUidDAO = imapUidDAO;
this.mailboxCounterDAO = mailboxCounterDAO;
@@ -111,7 +111,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
CassandraUidProvider uidProvider,
CassandraModSeqProvider modSeqProvider,
Session session,
- CassandraMessageDAOV2 messageDAOV2,
+ CassandraMessageDAO messageDAO,
CassandraMessageIdDAO messageIdDAO,
CassandraMessageIdToImapUidDAO imapUidDAO,
CassandraMailboxCounterDAO mailboxCounterDAO,
@@ -122,7 +122,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
CassandraApplicableFlagDAO applicableFlagDAO,
CassandraDeletedMessageDAO deletedMesageDAO) {
- this(uidProvider, modSeqProvider, session, messageDAOV2, messageIdDAO, imapUidDAO, mailboxCounterDAO,
+ this(uidProvider, modSeqProvider, session, messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO,
mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO,
CassandraUtils.WITH_DEFAULT_CONFIGURATION, CassandraConfiguration.DEFAULT_CONFIGURATION);
}
@@ -134,7 +134,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
modSeqProvider,
null,
(CassandraAttachmentMapper) createAttachmentMapper(mailboxSession),
- messageDAOV2,
+ messageDAO,
messageIdDAO,
imapUidDAO,
mailboxCounterDAO,
@@ -150,7 +150,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException {
return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO,
(CassandraAttachmentMapper) getAttachmentMapper(mailboxSession),
- imapUidDAO, messageIdDAO, messageDAOV2, indexTableHandler, modSeqProvider, mailboxSession,
+ imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession,
cassandraConfiguration);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
new file mode 100644
index 0000000..5129f3b
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -0,0 +1,398 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_CONTENT;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_OCTECTS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_START_OCTET;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FIELDS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FULL_CONTENT_OCTETS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADERS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADER_CONTENT;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.INTERNAL_DATE;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.METADATA;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.PROPERTIES;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TABLE_NAME;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.mailbox.cassandra.ids.BlobId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Attachments;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Properties;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.AttachmentId;
+import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.Property;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.OptionalConverter;
+import org.apache.james.util.streams.JamesCollectors;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class CassandraMessageDAO {
+ public static final long DEFAULT_LONG_VALUE = 0L;
+ public static final String DEFAULT_OBJECT_VALUE = null;
+ private static final byte[] EMPTY_BYTE_ARRAY = {};
+
+ private final CassandraAsyncExecutor cassandraAsyncExecutor;
+ private final CassandraTypesProvider typesProvider;
+ private final CassandraBlobsDAO blobsDAO;
+ private final CassandraConfiguration configuration;
+ private final PreparedStatement insert;
+ private final PreparedStatement delete;
+ private final PreparedStatement selectMetadata;
+ private final PreparedStatement selectHeaders;
+ private final PreparedStatement selectFields;
+ private final PreparedStatement selectBody;
+ private final Cid.CidParser cidParser;
+
+ @Inject
+ public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) {
+ this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+ this.typesProvider = typesProvider;
+ this.blobsDAO = blobsDAO;
+ this.configuration = cassandraConfiguration;
+ this.insert = prepareInsert(session);
+ this.delete = prepareDelete(session);
+ this.selectMetadata = prepareSelect(session, METADATA);
+ this.selectHeaders = prepareSelect(session, HEADERS);
+ this.selectFields = prepareSelect(session, FIELDS);
+ this.selectBody = prepareSelect(session, BODY);
+ this.cidParser = Cid.parser().relaxed();
+ }
+
+ @VisibleForTesting
+ public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
+ this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION);
+ }
+
+ private PreparedStatement prepareSelect(Session session, String[] fields) {
+ return session.prepare(select(fields)
+ .from(TABLE_NAME)
+ .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+ }
+
+ private PreparedStatement prepareInsert(Session session) {
+ return session.prepare(insertInto(TABLE_NAME)
+ .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+ .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
+ .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
+ .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
+ .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS))
+ .value(BODY_CONTENT, bindMarker(BODY_CONTENT))
+ .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT))
+ .value(PROPERTIES, bindMarker(PROPERTIES))
+ .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT))
+ .value(ATTACHMENTS, bindMarker(ATTACHMENTS)));
+ }
+
+ private PreparedStatement prepareDelete(Session session) {
+ return session.prepare(QueryBuilder.delete()
+ .from(TABLE_NAME)
+ .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+ }
+
+ public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
+ return saveContent(message).thenCompose(pair ->
+ cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
+ }
+
+ private CompletableFuture<Pair<Optional<BlobId>, Optional<BlobId>>> saveContent(MailboxMessage message) throws MailboxException {
+ try {
+ return CompletableFutureUtil.combine(
+ blobsDAO.save(
+ IOUtils.toByteArray(
+ message.getHeaderContent())),
+ blobsDAO.save(
+ IOUtils.toByteArray(
+ message.getBodyContent())),
+ Pair::of);
+ } catch (IOException e) {
+ throw new MailboxException("Error saving mail content", e);
+ }
+ }
+
+ private BoundStatement boundWriteStatement(MailboxMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) {
+ CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
+ return insert.bind()
+ .setUUID(MESSAGE_ID, messageId.get())
+ .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+ .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
+ .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
+ .setLong(BODY_OCTECTS, message.getBodyOctets())
+ .setString(BODY_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+ .setString(HEADER_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+ .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
+ .setList(PROPERTIES, buildPropertiesUdt(message))
+ .setList(ATTACHMENTS, buildAttachmentUdt(message));
+ }
+
+ private ImmutableList<UDTValue> buildAttachmentUdt(MailboxMessage message) {
+ return message.getAttachments().stream()
+ .map(this::toUDT)
+ .collect(Guavate.toImmutableList());
+ }
+
+ private List<UDTValue> buildPropertiesUdt(List<Property> properties) {
+ return properties.stream()
+ .map(property -> typesProvider.getDefinedUserType(PROPERTIES)
+ .newValue()
+ .setString(Properties.NAMESPACE, property.getNamespace())
+ .setString(Properties.NAME, property.getLocalName())
+ .setString(Properties.VALUE, property.getValue()))
+ .collect(Guavate.toImmutableList());
+ }
+
+ private UDTValue toUDT(MessageAttachment messageAttachment) {
+ return typesProvider.getDefinedUserType(ATTACHMENTS)
+ .newValue()
+ .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
+ .setString(Attachments.NAME, messageAttachment.getName().orNull())
+ .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull())
+ .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+ }
+
+ private List<UDTValue> buildPropertiesUdt(MailboxMessage message) {
+ return message.getProperties().stream()
+ .map(x -> typesProvider.getDefinedUserType(PROPERTIES)
+ .newValue()
+ .setString(Properties.NAMESPACE, x.getNamespace())
+ .setString(Properties.NAME, x.getLocalName())
+ .setString(Properties.VALUE, x.getValue()))
+ .collect(Guavate.toImmutableList());
+ }
+
+ public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+ return CompletableFutureUtil.chainAll(
+ limit.applyOnStream(messageIds.stream().distinct())
+ .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
+ ids -> rowToMessages(fetchType, ids))
+ .thenApply(stream -> stream.flatMap(Function.identity()));
+ }
+
+ private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
+ return FluentFutureStream.of(
+ ids.stream()
+ .map(id -> retrieveRow(id, fetchType)
+ .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
+ .completableFuture();
+ }
+
+ private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+ CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
+
+ return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
+ .bind()
+ .setUUID(MESSAGE_ID, cassandraMessageId.get()));
+ }
+
+ private CompletableFuture<MessageResult>
+ message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
+ ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
+
+ if (rows.isExhausted()) {
+ return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
+ }
+
+ Row row = rows.one();
+ CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
+
+ return contentFuture.thenApply(content -> {
+ MessageWithoutAttachment messageWithoutAttachment =
+ new MessageWithoutAttachment(
+ messageId.getMessageId(),
+ row.getTimestamp(INTERNAL_DATE),
+ row.getLong(FULL_CONTENT_OCTETS),
+ row.getInt(BODY_START_OCTET),
+ new SharedByteArrayInputStream(content),
+ messageIdWithMetaData.getFlags(),
+ getPropertyBuilder(row),
+ messageId.getMailboxId(),
+ messageId.getUid(),
+ messageIdWithMetaData.getModSeq());
+ return found(Pair.of(messageWithoutAttachment, getAttachments(row, fetchType)));
+ });
+ }
+
+ private PropertyBuilder getPropertyBuilder(Row row) {
+ PropertyBuilder property = new PropertyBuilder(
+ row.getList(PROPERTIES, UDTValue.class).stream()
+ .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
+ .collect(Collectors.toList()));
+ property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
+ return property;
+ }
+
+ private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) {
+ switch (fetchType) {
+ case Full:
+ case Body:
+ List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class);
+
+ return attachmentByIds(udtValues);
+ default:
+ return Stream.of();
+ }
+ }
+
+ private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UDTValue> udtValues) {
+ return udtValues.stream()
+ .map(this::messageAttachmentByIdFrom);
+ }
+
+ private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue udtValue) {
+ return MessageAttachmentRepresentation.builder()
+ .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
+ .name(udtValue.getString(Attachments.NAME))
+ .cid(OptionalConverter.fromGuava(
+ cidParser.parse(udtValue.getString(CassandraMessageV2Table.Attachments.CID))))
+ .isInline(udtValue.getBool(Attachments.IS_INLINE))
+ .build();
+ }
+
+ private PreparedStatement retrieveSelect(FetchType fetchType) {
+ switch (fetchType) {
+ case Body:
+ return selectBody;
+ case Full:
+ return selectFields;
+ case Headers:
+ return selectHeaders;
+ case Metadata:
+ return selectMetadata;
+ default:
+ throw new RuntimeException("Unknown FetchType " + fetchType);
+ }
+ }
+
+ public CompletableFuture<Void> delete(CassandraMessageId messageId) {
+ return cassandraAsyncExecutor.executeVoid(delete.bind()
+ .setUUID(MESSAGE_ID, messageId.get()));
+ }
+
+ private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
+ switch (fetchType) {
+ case Full:
+ return this::getFullContent;
+ case Headers:
+ return this::getHeaderContent;
+ case Body:
+ return row -> getBodyContent(row)
+ .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+ case Metadata:
+ return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
+ default:
+ throw new RuntimeException("Unknown FetchType " + fetchType);
+ }
+ }
+
+ private CompletableFuture<byte[]> getFullContent(Row row) {
+ return CompletableFutureUtil.combine(
+ getHeaderContent(row),
+ getBodyContent(row),
+ Bytes::concat);
+ }
+
+ private CompletableFuture<byte[]> getBodyContent(Row row) {
+ return getFieldContent(BODY_CONTENT, row);
+ }
+
+ private CompletableFuture<byte[]> getHeaderContent(Row row) {
+ return getFieldContent(HEADER_CONTENT, row);
+ }
+
+ private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
+ return blobsDAO.read(BlobId.from(row.getString(field)));
+ }
+
+ public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
+ return new MessageResult(id, Optional.empty());
+ }
+
+ public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) {
+ return new MessageResult(message.getLeft().getMetadata(), Optional.of(message));
+ }
+
+ public static class MessageResult {
+ private final ComposedMessageIdWithMetaData metaData;
+ private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message;
+
+ public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) {
+ this.metaData = metaData;
+ this.message = message;
+ }
+
+ public ComposedMessageIdWithMetaData getMetadata() {
+ return metaData;
+ }
+
+ public boolean isFound() {
+ return message.isPresent();
+ }
+
+ public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() {
+ return message.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
deleted file mode 100644
index ae86d59..0000000
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.mail;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_CONTENT;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_OCTECTS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_START_OCTET;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FIELDS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FULL_CONTENT_OCTETS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADERS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADER_CONTENT;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.INTERNAL_DATE;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.METADATA;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.PROPERTIES;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TABLE_NAME;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.inject.Inject;
-import javax.mail.util.SharedByteArrayInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.CassandraConfiguration;
-import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
-import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.mailbox.cassandra.ids.BlobId;
-import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Attachments;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Properties;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.AttachmentId;
-import org.apache.james.mailbox.model.Cid;
-import org.apache.james.mailbox.model.ComposedMessageId;
-import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.MessageAttachment;
-import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
-import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.mailbox.store.mail.model.Property;
-import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalConverter;
-import org.apache.james.util.streams.JamesCollectors;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.UDTValue;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Bytes;
-
-public class CassandraMessageDAOV2 {
- public static final long DEFAULT_LONG_VALUE = 0L;
- public static final String DEFAULT_OBJECT_VALUE = null;
- private static final byte[] EMPTY_BYTE_ARRAY = {};
-
- private final CassandraAsyncExecutor cassandraAsyncExecutor;
- private final CassandraTypesProvider typesProvider;
- private final CassandraBlobsDAO blobsDAO;
- private final CassandraConfiguration configuration;
- private final PreparedStatement insert;
- private final PreparedStatement delete;
- private final PreparedStatement selectMetadata;
- private final PreparedStatement selectHeaders;
- private final PreparedStatement selectFields;
- private final PreparedStatement selectBody;
- private final Cid.CidParser cidParser;
-
- @Inject
- public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) {
- this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
- this.typesProvider = typesProvider;
- this.blobsDAO = blobsDAO;
- this.configuration = cassandraConfiguration;
- this.insert = prepareInsert(session);
- this.delete = prepareDelete(session);
- this.selectMetadata = prepareSelect(session, METADATA);
- this.selectHeaders = prepareSelect(session, HEADERS);
- this.selectFields = prepareSelect(session, FIELDS);
- this.selectBody = prepareSelect(session, BODY);
- this.cidParser = Cid.parser().relaxed();
- }
-
- @VisibleForTesting
- public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
- this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION);
- }
-
- private PreparedStatement prepareSelect(Session session, String[] fields) {
- return session.prepare(select(fields)
- .from(TABLE_NAME)
- .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
- }
-
- private PreparedStatement prepareInsert(Session session) {
- return session.prepare(insertInto(TABLE_NAME)
- .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
- .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
- .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
- .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
- .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS))
- .value(BODY_CONTENT, bindMarker(BODY_CONTENT))
- .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT))
- .value(PROPERTIES, bindMarker(PROPERTIES))
- .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT))
- .value(ATTACHMENTS, bindMarker(ATTACHMENTS)));
- }
-
- private PreparedStatement prepareDelete(Session session) {
- return session.prepare(QueryBuilder.delete()
- .from(TABLE_NAME)
- .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
- }
-
- public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
- return saveContent(message).thenCompose(pair ->
- cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
- }
-
- private CompletableFuture<Pair<Optional<BlobId>, Optional<BlobId>>> saveContent(MailboxMessage message) throws MailboxException {
- try {
- return CompletableFutureUtil.combine(
- blobsDAO.save(
- IOUtils.toByteArray(
- message.getHeaderContent())),
- blobsDAO.save(
- IOUtils.toByteArray(
- message.getBodyContent())),
- Pair::of);
- } catch (IOException e) {
- throw new MailboxException("Error saving mail content", e);
- }
- }
-
- private BoundStatement boundWriteStatement(MailboxMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) {
- CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
- return insert.bind()
- .setUUID(MESSAGE_ID, messageId.get())
- .setTimestamp(INTERNAL_DATE, message.getInternalDate())
- .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
- .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
- .setLong(BODY_OCTECTS, message.getBodyOctets())
- .setString(BODY_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
- .setString(HEADER_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
- .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
- .setList(PROPERTIES, buildPropertiesUdt(message))
- .setList(ATTACHMENTS, buildAttachmentUdt(message));
- }
-
- private ImmutableList<UDTValue> buildAttachmentUdt(MailboxMessage message) {
- return message.getAttachments().stream()
- .map(this::toUDT)
- .collect(Guavate.toImmutableList());
- }
-
- private List<UDTValue> buildPropertiesUdt(List<Property> properties) {
- return properties.stream()
- .map(property -> typesProvider.getDefinedUserType(PROPERTIES)
- .newValue()
- .setString(Properties.NAMESPACE, property.getNamespace())
- .setString(Properties.NAME, property.getLocalName())
- .setString(Properties.VALUE, property.getValue()))
- .collect(Guavate.toImmutableList());
- }
-
- private UDTValue toUDT(MessageAttachment messageAttachment) {
- return typesProvider.getDefinedUserType(ATTACHMENTS)
- .newValue()
- .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
- .setString(Attachments.NAME, messageAttachment.getName().orNull())
- .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull())
- .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
- }
-
- private List<UDTValue> buildPropertiesUdt(MailboxMessage message) {
- return message.getProperties().stream()
- .map(x -> typesProvider.getDefinedUserType(PROPERTIES)
- .newValue()
- .setString(Properties.NAMESPACE, x.getNamespace())
- .setString(Properties.NAME, x.getLocalName())
- .setString(Properties.VALUE, x.getValue()))
- .collect(Guavate.toImmutableList());
- }
-
- public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return CompletableFutureUtil.chainAll(
- limit.applyOnStream(messageIds.stream().distinct())
- .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
- ids -> rowToMessages(fetchType, ids))
- .thenApply(stream -> stream.flatMap(Function.identity()));
- }
-
- private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
- return FluentFutureStream.of(
- ids.stream()
- .map(id -> retrieveRow(id, fetchType)
- .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
- .completableFuture();
- }
-
- private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
- CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
-
- return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
- .bind()
- .setUUID(MESSAGE_ID, cassandraMessageId.get()));
- }
-
- private CompletableFuture<MessageResult>
- message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
- ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
-
- if (rows.isExhausted()) {
- return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
- }
-
- Row row = rows.one();
- CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
-
- return contentFuture.thenApply(content -> {
- MessageWithoutAttachment messageWithoutAttachment =
- new MessageWithoutAttachment(
- messageId.getMessageId(),
- row.getTimestamp(INTERNAL_DATE),
- row.getLong(FULL_CONTENT_OCTETS),
- row.getInt(BODY_START_OCTET),
- new SharedByteArrayInputStream(content),
- messageIdWithMetaData.getFlags(),
- getPropertyBuilder(row),
- messageId.getMailboxId(),
- messageId.getUid(),
- messageIdWithMetaData.getModSeq());
- return found(Pair.of(messageWithoutAttachment, getAttachments(row, fetchType)));
- });
- }
-
- private PropertyBuilder getPropertyBuilder(Row row) {
- PropertyBuilder property = new PropertyBuilder(
- row.getList(PROPERTIES, UDTValue.class).stream()
- .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
- .collect(Collectors.toList()));
- property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
- return property;
- }
-
- private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) {
- switch (fetchType) {
- case Full:
- case Body:
- List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class);
-
- return attachmentByIds(udtValues);
- default:
- return Stream.of();
- }
- }
-
- private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UDTValue> udtValues) {
- return udtValues.stream()
- .map(this::messageAttachmentByIdFrom);
- }
-
- private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue udtValue) {
- return MessageAttachmentRepresentation.builder()
- .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
- .name(udtValue.getString(Attachments.NAME))
- .cid(OptionalConverter.fromGuava(
- cidParser.parse(udtValue.getString(CassandraMessageV2Table.Attachments.CID))))
- .isInline(udtValue.getBool(Attachments.IS_INLINE))
- .build();
- }
-
- private PreparedStatement retrieveSelect(FetchType fetchType) {
- switch (fetchType) {
- case Body:
- return selectBody;
- case Full:
- return selectFields;
- case Headers:
- return selectHeaders;
- case Metadata:
- return selectMetadata;
- default:
- throw new RuntimeException("Unknown FetchType " + fetchType);
- }
- }
-
- public CompletableFuture<Void> delete(CassandraMessageId messageId) {
- return cassandraAsyncExecutor.executeVoid(delete.bind()
- .setUUID(MESSAGE_ID, messageId.get()));
- }
-
- private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
- switch (fetchType) {
- case Full:
- return this::getFullContent;
- case Headers:
- return this::getHeaderContent;
- case Body:
- return row -> getBodyContent(row)
- .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
- case Metadata:
- return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
- default:
- throw new RuntimeException("Unknown FetchType " + fetchType);
- }
- }
-
- private CompletableFuture<byte[]> getFullContent(Row row) {
- return CompletableFutureUtil.combine(
- getHeaderContent(row),
- getBodyContent(row),
- Bytes::concat);
- }
-
- private CompletableFuture<byte[]> getBodyContent(Row row) {
- return getFieldContent(BODY_CONTENT, row);
- }
-
- private CompletableFuture<byte[]> getHeaderContent(Row row) {
- return getFieldContent(HEADER_CONTENT, row);
- }
-
- private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
- return blobsDAO.read(BlobId.from(row.getString(field)));
- }
-
- public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
- return new MessageResult(id, Optional.empty());
- }
-
- public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) {
- return new MessageResult(message.getLeft().getMetadata(), Optional.of(message));
- }
-
- public static class MessageResult {
- private final ComposedMessageIdWithMetaData metaData;
- private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message;
-
- public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) {
- this.metaData = metaData;
- this.message = message;
- }
-
- public ComposedMessageIdWithMetaData getMetadata() {
- return metaData;
- }
-
- public boolean isFound() {
- return message.isPresent();
- }
-
- public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() {
- return message.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 868b382..c9e7880 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -63,7 +63,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private final CassandraMailboxDAO mailboxDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMessageIdDAO messageIdDAO;
- private final CassandraMessageDAOV2 messageDAOV2;
+ private final CassandraMessageDAO messageDAO;
private final CassandraIndexTableHandler indexTableHandler;
private final ModSeqProvider modSeqProvider;
private final MailboxSession mailboxSession;
@@ -72,14 +72,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO,
- CassandraMessageDAOV2 messageDAOV2, CassandraIndexTableHandler indexTableHandler,
+ CassandraMessageDAO messageDAO, CassandraIndexTableHandler indexTableHandler,
ModSeqProvider modSeqProvider, MailboxSession mailboxSession, CassandraConfiguration cassandraConfiguration) {
this.mailboxMapper = mailboxMapper;
this.mailboxDAO = mailboxDAO;
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
- this.messageDAOV2 = messageDAOV2;
+ this.messageDAO = messageDAO;
this.indexTableHandler = indexTableHandler;
this.modSeqProvider = modSeqProvider;
this.mailboxSession = mailboxSession;
@@ -99,10 +99,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())))
.completableFuture()
.thenApply(stream -> stream.collect(Guavate.toImmutableList()))
- .thenCompose(composedMessageIds -> messageDAOV2.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
+ .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
.thenApply(stream -> stream
- .filter(CassandraMessageDAOV2.MessageResult::isFound)
- .map(CassandraMessageDAOV2.MessageResult::message))
+ .filter(CassandraMessageDAO.MessageResult::isFound)
+ .map(CassandraMessageDAO.MessageResult::message))
.thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType))
.thenCompose(this::filterMessagesWithExistingMailbox)
.join()
@@ -147,7 +147,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.flags(mailboxMessage.createFlags())
.modSeq(mailboxMessage.getModSeq())
.build();
- messageDAOV2.save(mailboxMessage)
+ messageDAO.save(mailboxMessage)
.thenCompose(voidValue -> CompletableFuture.allOf(
imapUidDAO.insert(composedMessageIdWithMetaData),
messageIdDAO.insert(composedMessageIdWithMetaData)))
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 074f336..cb3c583 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -73,7 +73,7 @@ public class CassandraMessageMapper implements MessageMapper {
private final CassandraModSeqProvider modSeqProvider;
private final MailboxSession mailboxSession;
private final CassandraUidProvider uidProvider;
- private final CassandraMessageDAOV2 messageDAOV2;
+ private final CassandraMessageDAO messageDAO;
private final CassandraMessageIdDAO messageIdDAO;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMailboxCounterDAO mailboxCounterDAO;
@@ -87,7 +87,7 @@ public class CassandraMessageMapper implements MessageMapper {
public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
MailboxSession mailboxSession, CassandraAttachmentMapper attachmentMapper,
- CassandraMessageDAOV2 messageDAOV2, CassandraMessageIdDAO messageIdDAO,
+ CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO,
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO,
CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
@@ -95,7 +95,7 @@ public class CassandraMessageMapper implements MessageMapper {
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
this.mailboxSession = mailboxSession;
- this.messageDAOV2 = messageDAOV2;
+ this.messageDAO = messageDAO;
this.messageIdDAO = messageIdDAO;
this.imapUidDAO = imapUidDAO;
this.mailboxCounterDAO = mailboxCounterDAO;
@@ -178,10 +178,10 @@ public class CassandraMessageMapper implements MessageMapper {
}
private CompletableFuture<Stream<SimpleMailboxMessage>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
- return messageDAOV2.retrieveMessages(messageIds, fetchType, limit)
+ return messageDAO.retrieveMessages(messageIds, fetchType, limit)
.thenApply(steam -> steam
- .filter(CassandraMessageDAOV2.MessageResult::isFound)
- .map(CassandraMessageDAOV2.MessageResult::message))
+ .filter(CassandraMessageDAO.MessageResult::isFound)
+ .map(CassandraMessageDAO.MessageResult::message))
.thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType));
}
@@ -217,9 +217,9 @@ public class CassandraMessageMapper implements MessageMapper {
return FluentFutureStream.ofOptionals(
uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)))
.performOnAll(this::deleteUsingMailboxId)
- .thenFlatCompose(idWithMetadata -> messageDAOV2.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
- .filter(CassandraMessageDAOV2.MessageResult::isFound)
- .map(CassandraMessageDAOV2.MessageResult::message)
+ .thenFlatCompose(idWithMetadata -> messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
+ .filter(CassandraMessageDAO.MessageResult::isFound)
+ .map(CassandraMessageDAO.MessageResult::message)
.map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
.completableFuture();
}
@@ -373,7 +373,7 @@ public class CassandraMessageMapper implements MessageMapper {
private CompletableFuture<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return messageDAOV2.save(message)
+ return messageDAO.save(message)
.thenCompose(aVoid -> insertIds(message, mailboxId));
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index 3fcbe7f..0589cc6 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -33,7 +33,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -58,7 +58,7 @@ public class CassandraMailboxManagerProvider {
CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory);
CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory);
CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(session);
- CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(session, cassandraTypesProvider, blobsDAO);
+ CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, cassandraTypesProvider, blobsDAO);
CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session);
CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(session);
CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, cassandraTypesProvider);
@@ -70,7 +70,7 @@ public class CassandraMailboxManagerProvider {
CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider,
modSeqProvider,
session,
- messageDAOV2,
+ messageDAO,
messageIdDAO,
imapUidDAO,
mailboxCounterDAO,
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index c945204..93eb618 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -67,7 +67,7 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage
@Override
public SubscriptionManager createSubscriptionManager() {
CassandraMessageIdToImapUidDAO imapUidDAO = null;
- CassandraMessageDAOV2 messageDAOV2 = null;
+ CassandraMessageDAO messageDAO = null;
CassandraMessageIdDAO messageIdDAO = null;
CassandraMailboxCounterDAO mailboxCounterDAO = null;
CassandraMailboxRecentsDAO mailboxRecentsDAO = null;
@@ -81,7 +81,7 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage
new CassandraUidProvider(cassandra.getConf()),
new CassandraModSeqProvider(cassandra.getConf()),
cassandra.getConf(),
- messageDAOV2,
+ messageDAO,
messageIdDAO,
imapUidDAO,
mailboxCounterDAO,
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index 24895ed..c38adf5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -32,7 +32,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -92,7 +92,7 @@ public class CassandraTestSystemFixture {
CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(cassandra.getConf(), messageIdFactory);
CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(cassandra.getConf(), messageIdFactory);
CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
- CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+ CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(cassandra.getConf());
CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(cassandra.getConf());
CassandraApplicableFlagDAO applicableFlagDAO = new CassandraApplicableFlagDAO(cassandra.getConf());
@@ -104,7 +104,7 @@ public class CassandraTestSystemFixture {
return new CassandraMailboxSessionMapperFactory(uidProvider,
modSeqProvider,
cassandra.getConf(),
- messageDAOV2,
+ messageDAO,
messageIdDAO,
imapUidDAO,
mailboxCounterDAO,
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
index 9bfe467..b904598 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
@@ -89,12 +89,12 @@ public class CassandraMailboxManagerAttachmentTest extends AbstractMailboxManage
CassandraDeletedMessageDAO deletedMessageDAO = new CassandraDeletedMessageDAO(cassandra.getConf());
CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
- CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+ CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
mailboxSessionMapperFactory = new CassandraMailboxSessionMapperFactory(
new CassandraUidProvider(cassandra.getConf()),
new CassandraModSeqProvider(cassandra.getConf()),
cassandra.getConf(),
- messageDAOV2,
+ messageDAO,
new CassandraMessageIdDAO(cassandra.getConf(), messageIdFactory),
new CassandraMessageIdToImapUidDAO(cassandra.getConf(), messageIdFactory),
new CassandraMailboxCounterDAO(cassandra.getConf()),
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
index 1d1f9c9..827e869 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
@@ -58,7 +58,6 @@ import com.google.common.collect.ImmutableList;
public class CassandraMapperProvider implements MapperProvider {
private static final Factory MESSAGE_ID_FACTORY = new CassandraMessageId.Factory();
- public static final int MAX_ACL_RETRY = 10;
private final CassandraCluster cassandra;
private final MessageUidProvider messageUidProvider;
@@ -112,12 +111,12 @@ public class CassandraMapperProvider implements MapperProvider {
CassandraFirstUnseenDAO firstUnseenDAO = new CassandraFirstUnseenDAO(cassandra.getConf());
CassandraDeletedMessageDAO deletedMessageDAO = new CassandraDeletedMessageDAO(cassandra.getConf());
CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
- CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+ CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
return new CassandraMailboxSessionMapperFactory(
new CassandraUidProvider(cassandra.getConf()),
cassandraModSeqProvider,
cassandra.getConf(),
- messageDAOV2,
+ messageDAO,
new CassandraMessageIdDAO(cassandra.getConf(), MESSAGE_ID_FACTORY),
new CassandraMessageIdToImapUidDAO(cassandra.getConf(), MESSAGE_ID_FACTORY),
new CassandraMailboxCounterDAO(cassandra.getConf()),
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
new file mode 100644
index 0000000..0fbfc71
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -0,0 +1,191 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
+import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class CassandraMessageDAOTest {
+ private static final int BODY_START = 16;
+ private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+ private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+ private static final MessageUid messageUid = MessageUid.of(1);
+
+ private CassandraCluster cassandra;
+
+ private CassandraMessageDAO testee;
+ private CassandraBlobsDAO blobsDAO;
+ private CassandraMessageId.Factory messageIdFactory;
+
+ private SimpleMailboxMessage message;
+ private CassandraMessageId messageId;
+ private ComposedMessageId composedMessageId;
+ private List<ComposedMessageIdWithMetaData> messageIds;
+
+ @Before
+ public void setUp() {
+ cassandra = CassandraCluster.create(new CassandraModuleComposite(new CassandraMessageModule(), new CassandraBlobModule()));
+ cassandra.ensureAllTables();
+
+ messageIdFactory = new CassandraMessageId.Factory();
+ messageId = messageIdFactory.generate();
+ blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
+ testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+
+ composedMessageId = new ComposedMessageId(MAILBOX_ID, messageId, messageUid);
+
+ messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
+ .composedMessageId(composedMessageId)
+ .flags(new Flags())
+ .modSeq(1)
+ .build());
+ }
+
+ @After
+ public void tearDown() {
+ cassandra.clearAllTables();
+ cassandra.close();
+ }
+
+ @Test
+ public void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
+ message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+ testee.save(message).join();
+
+ MessageWithoutAttachment attachmentRepresentation =
+ toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+
+ assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
+ .isEqualTo(0L);
+ }
+
+ @Test
+ public void saveShouldSaveTextualLineCount() throws Exception {
+ long textualLineCount = 10L;
+ PropertyBuilder propertyBuilder = new PropertyBuilder();
+ propertyBuilder.setTextualLineCount(textualLineCount);
+ message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder);
+
+ testee.save(message).join();
+
+ MessageWithoutAttachment attachmentRepresentation =
+ toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+
+ assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
+ }
+
+ @Test
+ public void saveShouldStoreMessageWithFullContent() throws Exception {
+ message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+ testee.save(message).join();
+
+ MessageWithoutAttachment attachmentRepresentation =
+ toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
+
+ assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+ .isEqualTo(CONTENT);
+ }
+
+ @Test
+ public void saveShouldStoreMessageWithBodyContent() throws Exception {
+ message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+ testee.save(message).join();
+
+ MessageWithoutAttachment attachmentRepresentation =
+ toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
+
+ byte[] expected = Bytes.concat(
+ new byte[BODY_START],
+ CONTENT.substring(BODY_START).getBytes(Charsets.UTF_8));
+ assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+ .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), Charsets.UTF_8));
+ }
+
+ @Test
+ public void saveShouldStoreMessageWithHeaderContent() throws Exception {
+ message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+ testee.save(message).join();
+
+ MessageWithoutAttachment attachmentRepresentation =
+ toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
+
+ assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+ .isEqualTo(CONTENT.substring(0, BODY_START));
+ }
+
+ private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
+ return SimpleMailboxMessage.builder()
+ .messageId(messageId)
+ .mailboxId(MAILBOX_ID)
+ .uid(messageUid)
+ .internalDate(new Date())
+ .bodyStartOctet(bodyStart)
+ .size(content.length())
+ .content(new SharedByteArrayInputStream(content.getBytes(Charsets.UTF_8)))
+ .flags(new Flags())
+ .propertyBuilder(propertyBuilder)
+ .build();
+ }
+
+ private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAO.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
+ return readOptional.join()
+ .map(CassandraMessageDAO.MessageResult::message)
+ .map(Pair::getLeft)
+ .findAny()
+ .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
deleted file mode 100644
index 5b98aa7..0000000
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.mail;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.ByteArrayInputStream;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
-import javax.mail.Flags;
-import javax.mail.util.SharedByteArrayInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.CassandraCluster;
-import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.cassandra.ids.CassandraId;
-import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
-import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
-import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
-import org.apache.james.mailbox.model.ComposedMessageId;
-import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Bytes;
-
-public class CassandraMessageDAOV2Test {
- private static final int BODY_START = 16;
- private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
- private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
- private static final MessageUid messageUid = MessageUid.of(1);
-
- private CassandraCluster cassandra;
-
- private CassandraMessageDAOV2 testee;
- private CassandraBlobsDAO blobsDAO;
- private CassandraMessageId.Factory messageIdFactory;
-
- private SimpleMailboxMessage message;
- private CassandraMessageId messageId;
- private ComposedMessageId composedMessageId;
- private List<ComposedMessageIdWithMetaData> messageIds;
-
- @Before
- public void setUp() {
- cassandra = CassandraCluster.create(new CassandraModuleComposite(new CassandraMessageModule(), new CassandraBlobModule()));
- cassandra.ensureAllTables();
-
- messageIdFactory = new CassandraMessageId.Factory();
- messageId = messageIdFactory.generate();
- blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
- testee = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
-
- composedMessageId = new ComposedMessageId(MAILBOX_ID, messageId, messageUid);
-
- messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
- .composedMessageId(composedMessageId)
- .flags(new Flags())
- .modSeq(1)
- .build());
- }
-
- @After
- public void tearDown() {
- cassandra.clearAllTables();
- cassandra.close();
- }
-
- @Test
- public void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
- message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
- testee.save(message).join();
-
- MessageWithoutAttachment attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
-
- assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
- .isEqualTo(0L);
- }
-
- @Test
- public void saveShouldSaveTextualLineCount() throws Exception {
- long textualLineCount = 10L;
- PropertyBuilder propertyBuilder = new PropertyBuilder();
- propertyBuilder.setTextualLineCount(textualLineCount);
- message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder);
-
- testee.save(message).join();
-
- MessageWithoutAttachment attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
-
- assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
- }
-
- @Test
- public void saveShouldStoreMessageWithFullContent() throws Exception {
- message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
- testee.save(message).join();
-
- MessageWithoutAttachment attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
-
- assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
- .isEqualTo(CONTENT);
- }
-
- @Test
- public void saveShouldStoreMessageWithBodyContent() throws Exception {
- message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
- testee.save(message).join();
-
- MessageWithoutAttachment attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
-
- byte[] expected = Bytes.concat(
- new byte[BODY_START],
- CONTENT.substring(BODY_START).getBytes(Charsets.UTF_8));
- assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
- .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), Charsets.UTF_8));
- }
-
- @Test
- public void saveShouldStoreMessageWithHeaderContent() throws Exception {
- message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
- testee.save(message).join();
-
- MessageWithoutAttachment attachmentRepresentation =
- toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
-
- assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
- .isEqualTo(CONTENT.substring(0, BODY_START));
- }
-
- private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
- return SimpleMailboxMessage.builder()
- .messageId(messageId)
- .mailboxId(MAILBOX_ID)
- .uid(messageUid)
- .internalDate(new Date())
- .bodyStartOctet(bodyStart)
- .size(content.length())
- .content(new SharedByteArrayInputStream(content.getBytes(Charsets.UTF_8)))
- .flags(new Flags())
- .propertyBuilder(propertyBuilder)
- .build();
- }
-
- private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAOV2.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
- return readOptional.join()
- .map(CassandraMessageDAOV2.MessageResult::message)
- .map(Pair::getLeft)
- .findAny()
- .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
----------------------------------------------------------------------
diff --git a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
index 75969a3..0ca4992 100644
--- a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
+++ b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
@@ -37,7 +37,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -110,7 +110,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
CassandraTypesProvider typesProvider = new CassandraTypesProvider(mailboxModule, session);
CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
CassandraBlobsDAO cassandraBlobsDAO = new CassandraBlobsDAO(session);
- CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(session, typesProvider, cassandraBlobsDAO);
+ CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, typesProvider, cassandraBlobsDAO);
CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory);
CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory);
CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session);
@@ -125,7 +125,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
uidProvider,
modSeqProvider,
session,
- messageDAOV2,
+ messageDAO,
messageIdDAO,
imapUidDAO,
mailboxCounterDAO,
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org