You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/08/22 03:31:59 UTC

[4/9] james-project git commit: JAMES-2111 s/CassandraMessageDAOV2/CassandraMessageDAO/g

JAMES-2111 s/CassandraMessageDAOV2/CassandraMessageDAO/g


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/9360cb3a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/9360cb3a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/9360cb3a

Branch: refs/heads/master
Commit: 9360cb3a3ad6c0543c036beceb7b608d1f6f637d
Parents: 51a1311
Author: benwa <bt...@linagora.com>
Authored: Tue Aug 15 15:55:56 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Aug 22 10:31:07 2017 +0700

----------------------------------------------------------------------
 .../CassandraMailboxSessionMapperFactory.java   |  16 +-
 .../cassandra/mail/CassandraMessageDAO.java     | 398 +++++++++++++++++++
 .../cassandra/mail/CassandraMessageDAOV2.java   | 398 -------------------
 .../mail/CassandraMessageIdMapper.java          |  14 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  20 +-
 .../CassandraMailboxManagerProvider.java        |   6 +-
 .../CassandraSubscriptionManagerTest.java       |   6 +-
 .../cassandra/CassandraTestSystemFixture.java   |   6 +-
 .../CassandraMailboxManagerAttachmentTest.java  |   4 +-
 .../cassandra/mail/CassandraMapperProvider.java |   5 +-
 .../cassandra/mail/CassandraMessageDAOTest.java | 191 +++++++++
 .../mail/CassandraMessageDAOV2Test.java         | 191 ---------
 .../cassandra/host/CassandraHostSystem.java     |   6 +-
 .../modules/mailbox/CassandraMailboxModule.java |   4 +-
 14 files changed, 632 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index e02d5cd..020e6a1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -35,7 +35,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
@@ -63,7 +63,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private final Session session;
     private final CassandraUidProvider uidProvider;
     private final CassandraModSeqProvider modSeqProvider;
-    private final CassandraMessageDAOV2 messageDAOV2;
+    private final CassandraMessageDAO messageDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMailboxCounterDAO mailboxCounterDAO;
@@ -79,7 +79,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
 
     @Inject
     public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
-                                                CassandraMessageDAOV2 messageDAOV2,
+                                                CassandraMessageDAO messageDAO,
                                                 CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
                                                 CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, CassandraMailboxDAO mailboxDAO,
                                                 CassandraMailboxPathDAO mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, CassandraApplicableFlagDAO applicableFlagDAO,
@@ -87,7 +87,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.session = session;
-        this.messageDAOV2 = messageDAOV2;
+        this.messageDAO = messageDAO;
         this.messageIdDAO = messageIdDAO;
         this.imapUidDAO = imapUidDAO;
         this.mailboxCounterDAO = mailboxCounterDAO;
@@ -111,7 +111,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         CassandraUidProvider uidProvider,
         CassandraModSeqProvider modSeqProvider,
         Session session,
-        CassandraMessageDAOV2 messageDAOV2,
+        CassandraMessageDAO messageDAO,
         CassandraMessageIdDAO messageIdDAO,
         CassandraMessageIdToImapUidDAO imapUidDAO,
         CassandraMailboxCounterDAO mailboxCounterDAO,
@@ -122,7 +122,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         CassandraApplicableFlagDAO applicableFlagDAO,
         CassandraDeletedMessageDAO deletedMesageDAO) {
 
-        this(uidProvider, modSeqProvider, session, messageDAOV2, messageIdDAO, imapUidDAO, mailboxCounterDAO,
+        this(uidProvider, modSeqProvider, session, messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO,
             mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO,
             CassandraUtils.WITH_DEFAULT_CONFIGURATION, CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
@@ -134,7 +134,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
                                           modSeqProvider,
                                           null,
                                           (CassandraAttachmentMapper) createAttachmentMapper(mailboxSession),
-                                          messageDAOV2,
+            messageDAO,
                                           messageIdDAO,
                                           imapUidDAO,
                                           mailboxCounterDAO,
@@ -150,7 +150,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException {
         return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO,
                 (CassandraAttachmentMapper) getAttachmentMapper(mailboxSession),
-                imapUidDAO, messageIdDAO, messageDAOV2, indexTableHandler, modSeqProvider, mailboxSession,
+                imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession,
                 cassandraConfiguration);
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
new file mode 100644
index 0000000..5129f3b
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -0,0 +1,398 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_CONTENT;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_OCTECTS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_START_OCTET;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FIELDS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FULL_CONTENT_OCTETS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADERS;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADER_CONTENT;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.INTERNAL_DATE;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.METADATA;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.PROPERTIES;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TABLE_NAME;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraConfiguration;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.mailbox.cassandra.ids.BlobId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Attachments;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Properties;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.AttachmentId;
+import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.Property;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.OptionalConverter;
+import org.apache.james.util.streams.JamesCollectors;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class CassandraMessageDAO {
+    public static final long DEFAULT_LONG_VALUE = 0L;
+    public static final String DEFAULT_OBJECT_VALUE = null;
+    private static final byte[] EMPTY_BYTE_ARRAY = {};
+
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
+    private final CassandraTypesProvider typesProvider;
+    private final CassandraBlobsDAO blobsDAO;
+    private final CassandraConfiguration configuration;
+    private final PreparedStatement insert;
+    private final PreparedStatement delete;
+    private final PreparedStatement selectMetadata;
+    private final PreparedStatement selectHeaders;
+    private final PreparedStatement selectFields;
+    private final PreparedStatement selectBody;
+    private final Cid.CidParser cidParser;
+
+    @Inject
+    public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) {
+        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.typesProvider = typesProvider;
+        this.blobsDAO = blobsDAO;
+        this.configuration = cassandraConfiguration;
+        this.insert = prepareInsert(session);
+        this.delete = prepareDelete(session);
+        this.selectMetadata = prepareSelect(session, METADATA);
+        this.selectHeaders = prepareSelect(session, HEADERS);
+        this.selectFields = prepareSelect(session, FIELDS);
+        this.selectBody = prepareSelect(session, BODY);
+        this.cidParser = Cid.parser().relaxed();
+    }
+
+    @VisibleForTesting
+    public CassandraMessageDAO(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
+        this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION);
+    }
+
+    private PreparedStatement prepareSelect(Session session, String[] fields) {
+        return session.prepare(select(fields)
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+            .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
+            .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
+            .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
+            .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS))
+            .value(BODY_CONTENT, bindMarker(BODY_CONTENT))
+            .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT))
+            .value(PROPERTIES, bindMarker(PROPERTIES))
+            .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT))
+            .value(ATTACHMENTS, bindMarker(ATTACHMENTS)));
+    }
+
+    private PreparedStatement prepareDelete(Session session) {
+        return session.prepare(QueryBuilder.delete()
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+    }
+
+    public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
+        return saveContent(message).thenCompose(pair ->
+            cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
+    }
+
+    private CompletableFuture<Pair<Optional<BlobId>, Optional<BlobId>>> saveContent(MailboxMessage message) throws MailboxException {
+        try {
+            return CompletableFutureUtil.combine(
+                blobsDAO.save(
+                    IOUtils.toByteArray(
+                        message.getHeaderContent())),
+                blobsDAO.save(
+                    IOUtils.toByteArray(
+                        message.getBodyContent())),
+                Pair::of);
+        } catch (IOException e) {
+            throw new MailboxException("Error saving mail content", e);
+        }
+    }
+
+    private BoundStatement boundWriteStatement(MailboxMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) {
+        CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
+        return insert.bind()
+            .setUUID(MESSAGE_ID, messageId.get())
+            .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+            .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
+            .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
+            .setLong(BODY_OCTECTS, message.getBodyOctets())
+            .setString(BODY_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+            .setString(HEADER_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
+            .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
+            .setList(PROPERTIES, buildPropertiesUdt(message))
+            .setList(ATTACHMENTS, buildAttachmentUdt(message));
+    }
+
+    private ImmutableList<UDTValue> buildAttachmentUdt(MailboxMessage message) {
+        return message.getAttachments().stream()
+            .map(this::toUDT)
+            .collect(Guavate.toImmutableList());
+    }
+
+    private List<UDTValue> buildPropertiesUdt(List<Property> properties) {
+        return properties.stream()
+            .map(property -> typesProvider.getDefinedUserType(PROPERTIES)
+                .newValue()
+                .setString(Properties.NAMESPACE, property.getNamespace())
+                .setString(Properties.NAME, property.getLocalName())
+                .setString(Properties.VALUE, property.getValue()))
+            .collect(Guavate.toImmutableList());
+    }
+
+    private UDTValue toUDT(MessageAttachment messageAttachment) {
+        return typesProvider.getDefinedUserType(ATTACHMENTS)
+            .newValue()
+            .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
+            .setString(Attachments.NAME, messageAttachment.getName().orNull())
+            .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull())
+            .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+    }
+
+    private List<UDTValue> buildPropertiesUdt(MailboxMessage message) {
+        return message.getProperties().stream()
+            .map(x -> typesProvider.getDefinedUserType(PROPERTIES)
+                .newValue()
+                .setString(Properties.NAMESPACE, x.getNamespace())
+                .setString(Properties.NAME, x.getLocalName())
+                .setString(Properties.VALUE, x.getValue()))
+            .collect(Guavate.toImmutableList());
+    }
+
+    public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
+        return CompletableFutureUtil.chainAll(
+                limit.applyOnStream(messageIds.stream().distinct())
+                    .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
+            ids -> rowToMessages(fetchType, ids))
+            .thenApply(stream -> stream.flatMap(Function.identity()));
+    }
+
+    private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
+        return FluentFutureStream.of(
+            ids.stream()
+                .map(id -> retrieveRow(id, fetchType)
+                    .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
+            .completableFuture();
+    }
+
+    private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
+        CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
+
+        return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
+            .bind()
+            .setUUID(MESSAGE_ID, cassandraMessageId.get()));
+    }
+
+    private CompletableFuture<MessageResult>
+    message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
+        ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
+
+        if (rows.isExhausted()) {
+            return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
+        }
+
+        Row row = rows.one();
+        CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
+
+        return contentFuture.thenApply(content -> {
+            MessageWithoutAttachment messageWithoutAttachment =
+                new MessageWithoutAttachment(
+                    messageId.getMessageId(),
+                    row.getTimestamp(INTERNAL_DATE),
+                    row.getLong(FULL_CONTENT_OCTETS),
+                    row.getInt(BODY_START_OCTET),
+                    new SharedByteArrayInputStream(content),
+                    messageIdWithMetaData.getFlags(),
+                    getPropertyBuilder(row),
+                    messageId.getMailboxId(),
+                    messageId.getUid(),
+                    messageIdWithMetaData.getModSeq());
+            return found(Pair.of(messageWithoutAttachment, getAttachments(row, fetchType)));
+        });
+    }
+
+    private PropertyBuilder getPropertyBuilder(Row row) {
+        PropertyBuilder property = new PropertyBuilder(
+            row.getList(PROPERTIES, UDTValue.class).stream()
+                .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
+                .collect(Collectors.toList()));
+        property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
+        return property;
+    }
+
+    private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) {
+        switch (fetchType) {
+            case Full:
+            case Body:
+                List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class);
+
+                return attachmentByIds(udtValues);
+            default:
+                return Stream.of();
+        }
+    }
+
+    private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UDTValue> udtValues) {
+        return udtValues.stream()
+            .map(this::messageAttachmentByIdFrom);
+    }
+
+    private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue udtValue) {
+        return MessageAttachmentRepresentation.builder()
+            .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
+            .name(udtValue.getString(Attachments.NAME))
+            .cid(OptionalConverter.fromGuava(
+                cidParser.parse(udtValue.getString(CassandraMessageV2Table.Attachments.CID))))
+            .isInline(udtValue.getBool(Attachments.IS_INLINE))
+            .build();
+    }
+
+    private PreparedStatement retrieveSelect(FetchType fetchType) {
+        switch (fetchType) {
+            case Body:
+                return selectBody;
+            case Full:
+                return selectFields;
+            case Headers:
+                return selectHeaders;
+            case Metadata:
+                return selectMetadata;
+            default:
+                throw new RuntimeException("Unknown FetchType " + fetchType);
+        }
+    }
+
+    public CompletableFuture<Void> delete(CassandraMessageId messageId) {
+        return cassandraAsyncExecutor.executeVoid(delete.bind()
+            .setUUID(MESSAGE_ID, messageId.get()));
+    }
+
+    private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
+        switch (fetchType) {
+            case Full:
+                return this::getFullContent;
+            case Headers:
+                return this::getHeaderContent;
+            case Body:
+                return row -> getBodyContent(row)
+                    .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+            case Metadata:
+                return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
+            default:
+                throw new RuntimeException("Unknown FetchType " + fetchType);
+        }
+    }
+
+    private CompletableFuture<byte[]> getFullContent(Row row) {
+        return CompletableFutureUtil.combine(
+            getHeaderContent(row),
+            getBodyContent(row),
+            Bytes::concat);
+    }
+
+    private CompletableFuture<byte[]> getBodyContent(Row row) {
+        return getFieldContent(BODY_CONTENT, row);
+    }
+
+    private CompletableFuture<byte[]> getHeaderContent(Row row) {
+        return getFieldContent(HEADER_CONTENT, row);
+    }
+
+    private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
+        return blobsDAO.read(BlobId.from(row.getString(field)));
+    }
+
+    public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
+        return new MessageResult(id, Optional.empty());
+    }
+
+    public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) {
+        return new MessageResult(message.getLeft().getMetadata(), Optional.of(message));
+    }
+
+    public static class MessageResult {
+        private final ComposedMessageIdWithMetaData metaData;
+        private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message;
+
+        public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) {
+            this.metaData = metaData;
+            this.message = message;
+        }
+
+        public ComposedMessageIdWithMetaData getMetadata() {
+            return metaData;
+        }
+
+        public boolean isFound() {
+            return message.isPresent();
+        }
+
+        public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() {
+            return message.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
deleted file mode 100644
index ae86d59..0000000
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.mail;
-
-import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_CONTENT;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_OCTECTS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_START_OCTET;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FIELDS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FULL_CONTENT_OCTETS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADERS;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADER_CONTENT;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.INTERNAL_DATE;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.METADATA;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.PROPERTIES;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TABLE_NAME;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.inject.Inject;
-import javax.mail.util.SharedByteArrayInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.CassandraConfiguration;
-import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
-import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.mailbox.cassandra.ids.BlobId;
-import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Attachments;
-import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Properties;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.AttachmentId;
-import org.apache.james.mailbox.model.Cid;
-import org.apache.james.mailbox.model.ComposedMessageId;
-import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.MessageAttachment;
-import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
-import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.apache.james.mailbox.store.mail.model.Property;
-import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
-import org.apache.james.util.CompletableFutureUtil;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalConverter;
-import org.apache.james.util.streams.JamesCollectors;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.UDTValue;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Bytes;
-
-public class CassandraMessageDAOV2 {
-    public static final long DEFAULT_LONG_VALUE = 0L;
-    public static final String DEFAULT_OBJECT_VALUE = null;
-    private static final byte[] EMPTY_BYTE_ARRAY = {};
-
-    private final CassandraAsyncExecutor cassandraAsyncExecutor;
-    private final CassandraTypesProvider typesProvider;
-    private final CassandraBlobsDAO blobsDAO;
-    private final CassandraConfiguration configuration;
-    private final PreparedStatement insert;
-    private final PreparedStatement delete;
-    private final PreparedStatement selectMetadata;
-    private final PreparedStatement selectHeaders;
-    private final PreparedStatement selectFields;
-    private final PreparedStatement selectBody;
-    private final Cid.CidParser cidParser;
-
-    @Inject
-    public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) {
-        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.typesProvider = typesProvider;
-        this.blobsDAO = blobsDAO;
-        this.configuration = cassandraConfiguration;
-        this.insert = prepareInsert(session);
-        this.delete = prepareDelete(session);
-        this.selectMetadata = prepareSelect(session, METADATA);
-        this.selectHeaders = prepareSelect(session, HEADERS);
-        this.selectFields = prepareSelect(session, FIELDS);
-        this.selectBody = prepareSelect(session, BODY);
-        this.cidParser = Cid.parser().relaxed();
-    }
-
-    @VisibleForTesting
-    public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
-        this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION);
-    }
-
-    private PreparedStatement prepareSelect(Session session, String[] fields) {
-        return session.prepare(select(fields)
-            .from(TABLE_NAME)
-            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
-    }
-
-    private PreparedStatement prepareInsert(Session session) {
-        return session.prepare(insertInto(TABLE_NAME)
-            .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
-            .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
-            .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
-            .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
-            .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS))
-            .value(BODY_CONTENT, bindMarker(BODY_CONTENT))
-            .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT))
-            .value(PROPERTIES, bindMarker(PROPERTIES))
-            .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT))
-            .value(ATTACHMENTS, bindMarker(ATTACHMENTS)));
-    }
-
-    private PreparedStatement prepareDelete(Session session) {
-        return session.prepare(QueryBuilder.delete()
-            .from(TABLE_NAME)
-            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
-    }
-
-    public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
-        return saveContent(message).thenCompose(pair ->
-            cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
-    }
-
-    private CompletableFuture<Pair<Optional<BlobId>, Optional<BlobId>>> saveContent(MailboxMessage message) throws MailboxException {
-        try {
-            return CompletableFutureUtil.combine(
-                blobsDAO.save(
-                    IOUtils.toByteArray(
-                        message.getHeaderContent())),
-                blobsDAO.save(
-                    IOUtils.toByteArray(
-                        message.getBodyContent())),
-                Pair::of);
-        } catch (IOException e) {
-            throw new MailboxException("Error saving mail content", e);
-        }
-    }
-
-    private BoundStatement boundWriteStatement(MailboxMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) {
-        CassandraMessageId messageId = (CassandraMessageId) message.getMessageId();
-        return insert.bind()
-            .setUUID(MESSAGE_ID, messageId.get())
-            .setTimestamp(INTERNAL_DATE, message.getInternalDate())
-            .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
-            .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
-            .setLong(BODY_OCTECTS, message.getBodyOctets())
-            .setString(BODY_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
-            .setString(HEADER_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE))
-            .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
-            .setList(PROPERTIES, buildPropertiesUdt(message))
-            .setList(ATTACHMENTS, buildAttachmentUdt(message));
-    }
-
-    private ImmutableList<UDTValue> buildAttachmentUdt(MailboxMessage message) {
-        return message.getAttachments().stream()
-            .map(this::toUDT)
-            .collect(Guavate.toImmutableList());
-    }
-
-    private List<UDTValue> buildPropertiesUdt(List<Property> properties) {
-        return properties.stream()
-            .map(property -> typesProvider.getDefinedUserType(PROPERTIES)
-                .newValue()
-                .setString(Properties.NAMESPACE, property.getNamespace())
-                .setString(Properties.NAME, property.getLocalName())
-                .setString(Properties.VALUE, property.getValue()))
-            .collect(Guavate.toImmutableList());
-    }
-
-    private UDTValue toUDT(MessageAttachment messageAttachment) {
-        return typesProvider.getDefinedUserType(ATTACHMENTS)
-            .newValue()
-            .setString(Attachments.ID, messageAttachment.getAttachmentId().getId())
-            .setString(Attachments.NAME, messageAttachment.getName().orNull())
-            .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull())
-            .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
-    }
-
-    private List<UDTValue> buildPropertiesUdt(MailboxMessage message) {
-        return message.getProperties().stream()
-            .map(x -> typesProvider.getDefinedUserType(PROPERTIES)
-                .newValue()
-                .setString(Properties.NAMESPACE, x.getNamespace())
-                .setString(Properties.NAME, x.getLocalName())
-                .setString(Properties.VALUE, x.getValue()))
-            .collect(Guavate.toImmutableList());
-    }
-
-    public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return CompletableFutureUtil.chainAll(
-                limit.applyOnStream(messageIds.stream().distinct())
-                    .collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
-            ids -> rowToMessages(fetchType, ids))
-            .thenApply(stream -> stream.flatMap(Function.identity()));
-    }
-
-    private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) {
-        return FluentFutureStream.of(
-            ids.stream()
-                .map(id -> retrieveRow(id, fetchType)
-                    .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType))))
-            .completableFuture();
-    }
-
-    private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
-        CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId();
-
-        return cassandraAsyncExecutor.execute(retrieveSelect(fetchType)
-            .bind()
-            .setUUID(MESSAGE_ID, cassandraMessageId.get()));
-    }
-
-    private CompletableFuture<MessageResult>
-    message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) {
-        ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId();
-
-        if (rows.isExhausted()) {
-            return CompletableFuture.completedFuture(notFound(messageIdWithMetaData));
-        }
-
-        Row row = rows.one();
-        CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row);
-
-        return contentFuture.thenApply(content -> {
-            MessageWithoutAttachment messageWithoutAttachment =
-                new MessageWithoutAttachment(
-                    messageId.getMessageId(),
-                    row.getTimestamp(INTERNAL_DATE),
-                    row.getLong(FULL_CONTENT_OCTETS),
-                    row.getInt(BODY_START_OCTET),
-                    new SharedByteArrayInputStream(content),
-                    messageIdWithMetaData.getFlags(),
-                    getPropertyBuilder(row),
-                    messageId.getMailboxId(),
-                    messageId.getUid(),
-                    messageIdWithMetaData.getModSeq());
-            return found(Pair.of(messageWithoutAttachment, getAttachments(row, fetchType)));
-        });
-    }
-
-    private PropertyBuilder getPropertyBuilder(Row row) {
-        PropertyBuilder property = new PropertyBuilder(
-            row.getList(PROPERTIES, UDTValue.class).stream()
-                .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
-                .collect(Collectors.toList()));
-        property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
-        return property;
-    }
-
-    private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) {
-        switch (fetchType) {
-            case Full:
-            case Body:
-                List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class);
-
-                return attachmentByIds(udtValues);
-            default:
-                return Stream.of();
-        }
-    }
-
-    private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UDTValue> udtValues) {
-        return udtValues.stream()
-            .map(this::messageAttachmentByIdFrom);
-    }
-
-    private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue udtValue) {
-        return MessageAttachmentRepresentation.builder()
-            .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
-            .name(udtValue.getString(Attachments.NAME))
-            .cid(OptionalConverter.fromGuava(
-                cidParser.parse(udtValue.getString(CassandraMessageV2Table.Attachments.CID))))
-            .isInline(udtValue.getBool(Attachments.IS_INLINE))
-            .build();
-    }
-
-    private PreparedStatement retrieveSelect(FetchType fetchType) {
-        switch (fetchType) {
-            case Body:
-                return selectBody;
-            case Full:
-                return selectFields;
-            case Headers:
-                return selectHeaders;
-            case Metadata:
-                return selectMetadata;
-            default:
-                throw new RuntimeException("Unknown FetchType " + fetchType);
-        }
-    }
-
-    public CompletableFuture<Void> delete(CassandraMessageId messageId) {
-        return cassandraAsyncExecutor.executeVoid(delete.bind()
-            .setUUID(MESSAGE_ID, messageId.get()));
-    }
-
-    private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) {
-        switch (fetchType) {
-            case Full:
-                return this::getFullContent;
-            case Headers:
-                return this::getHeaderContent;
-            case Body:
-                return row -> getBodyContent(row)
-                    .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
-            case Metadata:
-                return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY);
-            default:
-                throw new RuntimeException("Unknown FetchType " + fetchType);
-        }
-    }
-
-    private CompletableFuture<byte[]> getFullContent(Row row) {
-        return CompletableFutureUtil.combine(
-            getHeaderContent(row),
-            getBodyContent(row),
-            Bytes::concat);
-    }
-
-    private CompletableFuture<byte[]> getBodyContent(Row row) {
-        return getFieldContent(BODY_CONTENT, row);
-    }
-
-    private CompletableFuture<byte[]> getHeaderContent(Row row) {
-        return getFieldContent(HEADER_CONTENT, row);
-    }
-
-    private CompletableFuture<byte[]> getFieldContent(String field, Row row) {
-        return blobsDAO.read(BlobId.from(row.getString(field)));
-    }
-
-    public static MessageResult notFound(ComposedMessageIdWithMetaData id) {
-        return new MessageResult(id, Optional.empty());
-    }
-
-    public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) {
-        return new MessageResult(message.getLeft().getMetadata(), Optional.of(message));
-    }
-
-    public static class MessageResult {
-        private final ComposedMessageIdWithMetaData metaData;
-        private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message;
-
-        public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) {
-            this.metaData = metaData;
-            this.message = message;
-        }
-
-        public ComposedMessageIdWithMetaData getMetadata() {
-            return metaData;
-        }
-
-        public boolean isFound() {
-            return message.isPresent();
-        }
-
-        public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() {
-            return message.get();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 868b382..c9e7880 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -63,7 +63,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private final CassandraMailboxDAO mailboxDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
-    private final CassandraMessageDAOV2 messageDAOV2;
+    private final CassandraMessageDAO messageDAO;
     private final CassandraIndexTableHandler indexTableHandler;
     private final ModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
@@ -72,14 +72,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO,
-                                    CassandraMessageDAOV2 messageDAOV2, CassandraIndexTableHandler indexTableHandler,
+                                    CassandraMessageDAO messageDAO, CassandraIndexTableHandler indexTableHandler,
                                     ModSeqProvider modSeqProvider, MailboxSession mailboxSession, CassandraConfiguration cassandraConfiguration) {
 
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
-        this.messageDAOV2 = messageDAOV2;
+        this.messageDAO = messageDAO;
         this.indexTableHandler = indexTableHandler;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
@@ -99,10 +99,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
                 .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())))
             .completableFuture()
             .thenApply(stream -> stream.collect(Guavate.toImmutableList()))
-            .thenCompose(composedMessageIds -> messageDAOV2.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
+            .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
             .thenApply(stream -> stream
-                .filter(CassandraMessageDAOV2.MessageResult::isFound)
-                .map(CassandraMessageDAOV2.MessageResult::message))
+                .filter(CassandraMessageDAO.MessageResult::isFound)
+                .map(CassandraMessageDAO.MessageResult::message))
             .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType))
             .thenCompose(this::filterMessagesWithExistingMailbox)
             .join()
@@ -147,7 +147,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .flags(mailboxMessage.createFlags())
             .modSeq(mailboxMessage.getModSeq())
             .build();
-        messageDAOV2.save(mailboxMessage)
+        messageDAO.save(mailboxMessage)
             .thenCompose(voidValue -> CompletableFuture.allOf(
                 imapUidDAO.insert(composedMessageIdWithMetaData),
                 messageIdDAO.insert(composedMessageIdWithMetaData)))

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 074f336..cb3c583 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -73,7 +73,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private final CassandraModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
     private final CassandraUidProvider uidProvider;
-    private final CassandraMessageDAOV2 messageDAOV2;
+    private final CassandraMessageDAO messageDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMailboxCounterDAO mailboxCounterDAO;
@@ -87,7 +87,7 @@ public class CassandraMessageMapper implements MessageMapper {
 
     public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
                                   MailboxSession mailboxSession, CassandraAttachmentMapper attachmentMapper,
-                                  CassandraMessageDAOV2 messageDAOV2, CassandraMessageIdDAO messageIdDAO,
+                                  CassandraMessageDAO messageDAO, CassandraMessageIdDAO messageIdDAO,
                                   CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
                                   CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO,
                                   CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
@@ -95,7 +95,7 @@ public class CassandraMessageMapper implements MessageMapper {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
-        this.messageDAOV2 = messageDAOV2;
+        this.messageDAO = messageDAO;
         this.messageIdDAO = messageIdDAO;
         this.imapUidDAO = imapUidDAO;
         this.mailboxCounterDAO = mailboxCounterDAO;
@@ -178,10 +178,10 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private CompletableFuture<Stream<SimpleMailboxMessage>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
-        return messageDAOV2.retrieveMessages(messageIds, fetchType, limit)
+        return messageDAO.retrieveMessages(messageIds, fetchType, limit)
             .thenApply(steam -> steam
-                .filter(CassandraMessageDAOV2.MessageResult::isFound)
-                .map(CassandraMessageDAOV2.MessageResult::message))
+                .filter(CassandraMessageDAO.MessageResult::isFound)
+                .map(CassandraMessageDAO.MessageResult::message))
             .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType));
     }
 
@@ -217,9 +217,9 @@ public class CassandraMessageMapper implements MessageMapper {
         return FluentFutureStream.ofOptionals(
                 uidChunk.stream().map(uid -> retrieveComposedId(mailboxId, uid)))
             .performOnAll(this::deleteUsingMailboxId)
-            .thenFlatCompose(idWithMetadata -> messageDAOV2.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
-            .filter(CassandraMessageDAOV2.MessageResult::isFound)
-            .map(CassandraMessageDAOV2.MessageResult::message)
+            .thenFlatCompose(idWithMetadata -> messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
+            .filter(CassandraMessageDAO.MessageResult::isFound)
+            .map(CassandraMessageDAO.MessageResult::message)
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
             .completableFuture();
     }
@@ -373,7 +373,7 @@ public class CassandraMessageMapper implements MessageMapper {
 
     private CompletableFuture<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return messageDAOV2.save(message)
+        return messageDAO.save(message)
             .thenCompose(aVoid -> insertIds(message, mailboxId));
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
index 3fcbe7f..0589cc6 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerProvider.java
@@ -33,7 +33,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -58,7 +58,7 @@ public class CassandraMailboxManagerProvider {
         CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory);
         CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(session);
-        CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(session, cassandraTypesProvider, blobsDAO);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, cassandraTypesProvider, blobsDAO);
         CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session);
         CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(session);
         CassandraMailboxDAO mailboxDAO = new CassandraMailboxDAO(session, cassandraTypesProvider);
@@ -70,7 +70,7 @@ public class CassandraMailboxManagerProvider {
         CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider,
             modSeqProvider,
             session,
-            messageDAOV2,
+            messageDAO,
             messageIdDAO,
             imapUidDAO,
             mailboxCounterDAO,

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index c945204..93eb618 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -30,7 +30,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -67,7 +67,7 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage
     @Override
     public SubscriptionManager createSubscriptionManager() {
         CassandraMessageIdToImapUidDAO imapUidDAO = null;
-        CassandraMessageDAOV2 messageDAOV2 = null;
+        CassandraMessageDAO messageDAO = null;
         CassandraMessageIdDAO messageIdDAO = null;
         CassandraMailboxCounterDAO mailboxCounterDAO = null;
         CassandraMailboxRecentsDAO mailboxRecentsDAO = null;
@@ -81,7 +81,7 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage
                 new CassandraUidProvider(cassandra.getConf()),
                 new CassandraModSeqProvider(cassandra.getConf()),
                 cassandra.getConf(),
-                messageDAOV2,
+                messageDAO,
                 messageIdDAO,
                 imapUidDAO,
                 mailboxCounterDAO,

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index 24895ed..c38adf5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -32,7 +32,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -92,7 +92,7 @@ public class CassandraTestSystemFixture {
         CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(cassandra.getConf(), messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(cassandra.getConf(), messageIdFactory);
         CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
-        CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
         CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(cassandra.getConf());
         CassandraMailboxRecentsDAO mailboxRecentsDAO = new CassandraMailboxRecentsDAO(cassandra.getConf());
         CassandraApplicableFlagDAO applicableFlagDAO = new CassandraApplicableFlagDAO(cassandra.getConf());
@@ -104,7 +104,7 @@ public class CassandraTestSystemFixture {
         return new CassandraMailboxSessionMapperFactory(uidProvider,
             modSeqProvider,
             cassandra.getConf(),
-            messageDAOV2,
+            messageDAO,
             messageIdDAO,
             imapUidDAO,
             mailboxCounterDAO,

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
index 9bfe467..b904598 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxManagerAttachmentTest.java
@@ -89,12 +89,12 @@ public class CassandraMailboxManagerAttachmentTest extends AbstractMailboxManage
         CassandraDeletedMessageDAO deletedMessageDAO = new CassandraDeletedMessageDAO(cassandra.getConf());
 
         CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
-        CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
         mailboxSessionMapperFactory = new CassandraMailboxSessionMapperFactory(
                 new CassandraUidProvider(cassandra.getConf()),
                 new CassandraModSeqProvider(cassandra.getConf()),
                 cassandra.getConf(),
-            messageDAOV2,
+            messageDAO,
                 new CassandraMessageIdDAO(cassandra.getConf(), messageIdFactory),
                 new CassandraMessageIdToImapUidDAO(cassandra.getConf(), messageIdFactory),
                 new CassandraMailboxCounterDAO(cassandra.getConf()),

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
index 1d1f9c9..827e869 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMapperProvider.java
@@ -58,7 +58,6 @@ import com.google.common.collect.ImmutableList;
 public class CassandraMapperProvider implements MapperProvider {
 
     private static final Factory MESSAGE_ID_FACTORY = new CassandraMessageId.Factory();
-    public static final int MAX_ACL_RETRY = 10;
 
     private final CassandraCluster cassandra;
     private final MessageUidProvider messageUidProvider;
@@ -112,12 +111,12 @@ public class CassandraMapperProvider implements MapperProvider {
         CassandraFirstUnseenDAO firstUnseenDAO = new CassandraFirstUnseenDAO(cassandra.getConf());
         CassandraDeletedMessageDAO deletedMessageDAO = new CassandraDeletedMessageDAO(cassandra.getConf());
         CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
-        CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
         return new CassandraMailboxSessionMapperFactory(
             new CassandraUidProvider(cassandra.getConf()),
             cassandraModSeqProvider,
             cassandra.getConf(),
-            messageDAOV2,
+            messageDAO,
             new CassandraMessageIdDAO(cassandra.getConf(), MESSAGE_ID_FACTORY),
             new CassandraMessageIdToImapUidDAO(cassandra.getConf(), MESSAGE_ID_FACTORY),
             new CassandraMailboxCounterDAO(cassandra.getConf()),

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
new file mode 100644
index 0000000..0fbfc71
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOTest.java
@@ -0,0 +1,191 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
+import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
+import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class CassandraMessageDAOTest {
+    private static final int BODY_START = 16;
+    private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+    private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+    private static final MessageUid messageUid = MessageUid.of(1);
+
+    private CassandraCluster cassandra;
+
+    private CassandraMessageDAO testee;
+    private CassandraBlobsDAO blobsDAO;
+    private CassandraMessageId.Factory messageIdFactory;
+
+    private SimpleMailboxMessage message;
+    private CassandraMessageId messageId;
+    private ComposedMessageId composedMessageId;
+    private List<ComposedMessageIdWithMetaData> messageIds;
+
+    @Before
+    public void setUp() {
+        cassandra = CassandraCluster.create(new CassandraModuleComposite(new CassandraMessageModule(), new CassandraBlobModule()));
+        cassandra.ensureAllTables();
+
+        messageIdFactory = new CassandraMessageId.Factory();
+        messageId = messageIdFactory.generate();
+        blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
+        testee = new CassandraMessageDAO(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
+
+        composedMessageId = new ComposedMessageId(MAILBOX_ID, messageId, messageUid);
+
+        messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(composedMessageId)
+                .flags(new Flags())
+                .modSeq(1)
+                .build());
+    }
+
+    @After
+    public void tearDown() {
+        cassandra.clearAllTables();
+        cassandra.close();
+    }
+
+    @Test
+    public void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+        testee.save(message).join();
+
+        MessageWithoutAttachment attachmentRepresentation =
+            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+
+        assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
+            .isEqualTo(0L);
+    }
+
+    @Test
+    public void saveShouldSaveTextualLineCount() throws Exception {
+        long textualLineCount = 10L;
+        PropertyBuilder propertyBuilder = new PropertyBuilder();
+        propertyBuilder.setTextualLineCount(textualLineCount);
+        message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder);
+
+        testee.save(message).join();
+
+        MessageWithoutAttachment attachmentRepresentation =
+            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
+
+        assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
+    }
+
+    @Test
+    public void saveShouldStoreMessageWithFullContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+        testee.save(message).join();
+
+        MessageWithoutAttachment attachmentRepresentation =
+            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
+
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+            .isEqualTo(CONTENT);
+    }
+
+    @Test
+    public void saveShouldStoreMessageWithBodyContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+        testee.save(message).join();
+
+        MessageWithoutAttachment attachmentRepresentation =
+            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
+
+        byte[] expected = Bytes.concat(
+            new byte[BODY_START],
+            CONTENT.substring(BODY_START).getBytes(Charsets.UTF_8));
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+            .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), Charsets.UTF_8));
+    }
+
+    @Test
+    public void saveShouldStoreMessageWithHeaderContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
+
+        testee.save(message).join();
+
+        MessageWithoutAttachment attachmentRepresentation =
+            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
+
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
+            .isEqualTo(CONTENT.substring(0, BODY_START));
+    }
+
+    private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
+        return SimpleMailboxMessage.builder()
+            .messageId(messageId)
+            .mailboxId(MAILBOX_ID)
+            .uid(messageUid)
+            .internalDate(new Date())
+            .bodyStartOctet(bodyStart)
+            .size(content.length())
+            .content(new SharedByteArrayInputStream(content.getBytes(Charsets.UTF_8)))
+            .flags(new Flags())
+            .propertyBuilder(propertyBuilder)
+            .build();
+    }
+
+    private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAO.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
+        return readOptional.join()
+            .map(CassandraMessageDAO.MessageResult::message)
+            .map(Pair::getLeft)
+            .findAny()
+            .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
deleted file mode 100644
index 5b98aa7..0000000
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.cassandra.mail;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.ByteArrayInputStream;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
-import javax.mail.Flags;
-import javax.mail.util.SharedByteArrayInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.backends.cassandra.CassandraCluster;
-import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.cassandra.ids.CassandraId;
-import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
-import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
-import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
-import org.apache.james.mailbox.model.ComposedMessageId;
-import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Bytes;
-
-public class CassandraMessageDAOV2Test {
-    private static final int BODY_START = 16;
-    private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
-    private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
-    private static final MessageUid messageUid = MessageUid.of(1);
-
-    private CassandraCluster cassandra;
-
-    private CassandraMessageDAOV2 testee;
-    private CassandraBlobsDAO blobsDAO;
-    private CassandraMessageId.Factory messageIdFactory;
-
-    private SimpleMailboxMessage message;
-    private CassandraMessageId messageId;
-    private ComposedMessageId composedMessageId;
-    private List<ComposedMessageIdWithMetaData> messageIds;
-
-    @Before
-    public void setUp() {
-        cassandra = CassandraCluster.create(new CassandraModuleComposite(new CassandraMessageModule(), new CassandraBlobModule()));
-        cassandra.ensureAllTables();
-
-        messageIdFactory = new CassandraMessageId.Factory();
-        messageId = messageIdFactory.generate();
-        blobsDAO = new CassandraBlobsDAO(cassandra.getConf());
-        testee = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO);
-
-        composedMessageId = new ComposedMessageId(MAILBOX_ID, messageId, messageUid);
-
-        messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder()
-                .composedMessageId(composedMessageId)
-                .flags(new Flags())
-                .modSeq(1)
-                .build());
-    }
-
-    @After
-    public void tearDown() {
-        cassandra.clearAllTables();
-        cassandra.close();
-    }
-
-    @Test
-    public void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
-        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
-        testee.save(message).join();
-
-        MessageWithoutAttachment attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
-
-        assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount())
-            .isEqualTo(0L);
-    }
-
-    @Test
-    public void saveShouldSaveTextualLineCount() throws Exception {
-        long textualLineCount = 10L;
-        PropertyBuilder propertyBuilder = new PropertyBuilder();
-        propertyBuilder.setTextualLineCount(textualLineCount);
-        message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder);
-
-        testee.save(message).join();
-
-        MessageWithoutAttachment attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited()));
-
-        assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount);
-    }
-
-    @Test
-    public void saveShouldStoreMessageWithFullContent() throws Exception {
-        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
-        testee.save(message).join();
-
-        MessageWithoutAttachment attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited()));
-
-        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
-            .isEqualTo(CONTENT);
-    }
-
-    @Test
-    public void saveShouldStoreMessageWithBodyContent() throws Exception {
-        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
-        testee.save(message).join();
-
-        MessageWithoutAttachment attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited()));
-
-        byte[] expected = Bytes.concat(
-            new byte[BODY_START],
-            CONTENT.substring(BODY_START).getBytes(Charsets.UTF_8));
-        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
-            .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), Charsets.UTF_8));
-    }
-
-    @Test
-    public void saveShouldStoreMessageWithHeaderContent() throws Exception {
-        message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder());
-
-        testee.save(message).join();
-
-        MessageWithoutAttachment attachmentRepresentation =
-            toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited()));
-
-        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8))
-            .isEqualTo(CONTENT.substring(0, BODY_START));
-    }
-
-    private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder) {
-        return SimpleMailboxMessage.builder()
-            .messageId(messageId)
-            .mailboxId(MAILBOX_ID)
-            .uid(messageUid)
-            .internalDate(new Date())
-            .bodyStartOctet(bodyStart)
-            .size(content.length())
-            .content(new SharedByteArrayInputStream(content.getBytes(Charsets.UTF_8)))
-            .flags(new Flags())
-            .propertyBuilder(propertyBuilder)
-            .build();
-    }
-
-    private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAOV2.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException {
-        return readOptional.join()
-            .map(CassandraMessageDAOV2.MessageResult::message)
-            .map(Pair::getLeft)
-            .findAny()
-            .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty"));
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/9360cb3a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
----------------------------------------------------------------------
diff --git a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
index 75969a3..0ca4992 100644
--- a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
+++ b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
@@ -37,7 +37,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxCounterDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
@@ -110,7 +110,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
         CassandraTypesProvider typesProvider = new CassandraTypesProvider(mailboxModule, session);
         CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
         CassandraBlobsDAO cassandraBlobsDAO = new CassandraBlobsDAO(session);
-        CassandraMessageDAOV2 messageDAOV2 = new CassandraMessageDAOV2(session, typesProvider, cassandraBlobsDAO);
+        CassandraMessageDAO messageDAO = new CassandraMessageDAO(session, typesProvider, cassandraBlobsDAO);
         CassandraMessageIdDAO messageIdDAO = new CassandraMessageIdDAO(session, messageIdFactory);
         CassandraMessageIdToImapUidDAO imapUidDAO = new CassandraMessageIdToImapUidDAO(session, messageIdFactory);
         CassandraMailboxCounterDAO mailboxCounterDAO = new CassandraMailboxCounterDAO(session);
@@ -125,7 +125,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
             uidProvider,
             modSeqProvider,
             session,
-            messageDAOV2,
+            messageDAO,
             messageIdDAO,
             imapUidDAO,
             mailboxCounterDAO,


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org