You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2018/01/25 15:36:39 UTC

[06/14] james-project git commit: JAMES-2291 Save mail metadata in a DAO

JAMES-2291 Save mail metadata in a DAO


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0806d784
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0806d784
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0806d784

Branch: refs/heads/master
Commit: 0806d784d0ae3a843644c71aa535d25597394313
Parents: 5afe3f6
Author: Raphael Ouazana <ra...@linagora.com>
Authored: Wed Jan 24 14:04:15 2018 +0100
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 25 16:28:45 2018 +0700

----------------------------------------------------------------------
 .../org/apache/mailet/PerRecipientHeaders.java  |   5 +
 .../mailrepository-cassandra/pom.xml            |  10 +
 .../CassandraMailRepositoryMailDAO.java         | 318 +++++++++++++++++++
 .../CassandraMailRepositoryMailDAOTest.java     | 158 +++++++++
 4 files changed, 491 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0806d784/mailet/api/src/main/java/org/apache/mailet/PerRecipientHeaders.java
----------------------------------------------------------------------
diff --git a/mailet/api/src/main/java/org/apache/mailet/PerRecipientHeaders.java b/mailet/api/src/main/java/org/apache/mailet/PerRecipientHeaders.java
index 2f2fc83..cb3fce7 100644
--- a/mailet/api/src/main/java/org/apache/mailet/PerRecipientHeaders.java
+++ b/mailet/api/src/main/java/org/apache/mailet/PerRecipientHeaders.java
@@ -33,12 +33,17 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 public class PerRecipientHeaders implements Serializable {
+
     private Multimap<MailAddress, Header> headersByRecipient;
 
     public PerRecipientHeaders() {
         headersByRecipient = ArrayListMultimap.create();
     }
 
+    public Multimap<MailAddress, Header> getHeadersByRecipient() {
+        return ArrayListMultimap.create(headersByRecipient);
+    }
+
     public Collection<MailAddress> getRecipientsWithSpecificHeaders() {
         return headersByRecipient.keySet();
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/0806d784/server/mailrepository/mailrepository-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml b/server/mailrepository/mailrepository-cassandra/pom.xml
index 4c4ef0f..d55228d 100644
--- a/server/mailrepository/mailrepository-cassandra/pom.xml
+++ b/server/mailrepository/mailrepository-cassandra/pom.xml
@@ -45,6 +45,16 @@
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>james-server-core</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/0806d784/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
new file mode 100644
index 0000000..2415ff9
--- /dev/null
+++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAO.java
@@ -0,0 +1,318 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.cassandra;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.ATTRIBUTES;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.BODY_BLOB_ID;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.CONTENT_TABLE_NAME;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.ERROR_MESSAGE;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.HEADER_BLOB_ID;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.HEADER_NAME;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.HEADER_TYPE;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.HEADER_VALUE;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.LAST_UPDATED;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_KEY;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MAIL_PROPERTIES;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.MESSAGE_SIZE;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.PER_RECIPIENT_SPECIFIC_HEADERS;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.RECIPIENTS;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REMOTE_ADDR;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REMOTE_HOST;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.REPOSITORY_NAME;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.SENDER;
+import static org.apache.james.mailrepository.cassandra.MailRepositoryTable.STATE;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.AddressException;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.core.MailAddress;
+import org.apache.james.server.core.MailImpl;
+import org.apache.james.util.streams.Iterators;
+import org.apache.mailet.Mail;
+import org.apache.mailet.PerRecipientHeaders;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class CassandraMailRepositoryMailDAO {
+
+    private final CassandraAsyncExecutor executor;
+    private final PreparedStatement insertMail;
+    private final PreparedStatement deleteMail;
+    private final PreparedStatement selectMail;
+    private final BlobId.Factory blobIdFactory;
+    private final CassandraTypesProvider cassandraTypesProvider;
+
+    public CassandraMailRepositoryMailDAO(Session session, BlobId.Factory blobIdFactory,
+                                          CassandraTypesProvider cassandraTypesProvider) {
+        this.executor = new CassandraAsyncExecutor(session);
+
+        this.insertMail = prepareInsert(session);
+        this.deleteMail = prepareDelete(session);
+        this.selectMail = prepareSelect(session);
+        this.blobIdFactory = blobIdFactory;
+        this.cassandraTypesProvider = cassandraTypesProvider;
+    }
+
+    private PreparedStatement prepareDelete(Session session) {
+        return session.prepare(delete()
+            .from(CONTENT_TABLE_NAME)
+            .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME)))
+            .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(CONTENT_TABLE_NAME)
+            .value(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME))
+            .value(MAIL_KEY, bindMarker(MAIL_KEY))
+            .value(MESSAGE_SIZE, bindMarker(MESSAGE_SIZE))
+            .value(STATE, bindMarker(STATE))
+            .value(SENDER, bindMarker(SENDER))
+            .value(RECIPIENTS, bindMarker(RECIPIENTS))
+            .value(ATTRIBUTES, bindMarker(ATTRIBUTES))
+            .value(ERROR_MESSAGE, bindMarker(ERROR_MESSAGE))
+            .value(REMOTE_ADDR, bindMarker(REMOTE_ADDR))
+            .value(REMOTE_HOST, bindMarker(REMOTE_HOST))
+            .value(LAST_UPDATED, bindMarker(LAST_UPDATED))
+            .value(HEADER_BLOB_ID, bindMarker(HEADER_BLOB_ID))
+            .value(BODY_BLOB_ID, bindMarker(BODY_BLOB_ID))
+            .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS)));
+    }
+
+    private PreparedStatement prepareSelect(Session session) {
+        return session.prepare(
+            select(MAIL_PROPERTIES)
+                .from(CONTENT_TABLE_NAME)
+                .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME)))
+                .and(eq(MAIL_KEY, bindMarker(MAIL_KEY))));
+    }
+
+    public CompletableFuture<Void> store(String url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException {
+        return executor.executeVoid(insertMail.bind()
+            .setString(REPOSITORY_NAME, url)
+            .setString(MAIL_KEY, mail.getName())
+            .setString(HEADER_BLOB_ID, headerId.asString())
+            .setString(BODY_BLOB_ID, bodyId.asString())
+            .setString(STATE, mail.getState())
+            .setString(SENDER, Optional.ofNullable(mail.getSender())
+                .map(MailAddress::asString)
+                .orElse(null))
+            .setList(RECIPIENTS, asStringList(mail.getRecipients()))
+            .setString(ERROR_MESSAGE, mail.getErrorMessage())
+            .setString(REMOTE_ADDR, mail.getRemoteAddr())
+            .setString(REMOTE_HOST, mail.getRemoteHost())
+            .setLong(MESSAGE_SIZE, mail.getMessageSize())
+            .setTimestamp(LAST_UPDATED, mail.getLastUpdated())
+            .setMap(ATTRIBUTES, toRawAttributeMap(mail))
+            .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(mail.getPerRecipientSpecificHeaders()))
+        );
+    }
+
+    public CompletableFuture<Void> remove(String url, String key) {
+        return executor.executeVoid(deleteMail.bind()
+            .setString(REPOSITORY_NAME, url)
+            .setString(MAIL_KEY, key));
+    }
+
+    public CompletableFuture<Optional<MailDTO>> read(String url, String key) {
+        return executor.executeSingleRow(selectMail.bind()
+            .setString(REPOSITORY_NAME, url)
+            .setString(MAIL_KEY, key))
+            .thenApply(rowOptional -> rowOptional.map(this::toMail));
+    }
+
+    private MailDTO toMail(Row row) {
+        MailAddress sender = Optional.ofNullable(row.getString(SENDER))
+            .map(Throwing.function(MailAddress::new))
+            .orElse(null);
+        List<MailAddress> recipients = row.getList(RECIPIENTS, String.class)
+            .stream()
+            .map(Throwing.function(MailAddress::new))
+            .collect(Guavate.toImmutableList());
+        String state = row.getString(STATE);
+        String remoteAddr = row.getString(REMOTE_ADDR);
+        String remoteHost = row.getString(REMOTE_HOST);
+        String errorMessage = row.getString(ERROR_MESSAGE);
+        String name = row.getString(MAIL_KEY);
+        Date lastUpdated = row.getTimestamp(LAST_UPDATED);
+        Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class);
+        PerRecipientHeaders perRecipientHeaders = fromHeaderMap(row.getMap(PER_RECIPIENT_SPECIFIC_HEADERS, String.class, UDTValue.class));
+
+        MailImpl.Builder mailBuilder = MailImpl.builder()
+            .name(name)
+            .sender(sender)
+            .recipients(recipients)
+            .lastUpdated(lastUpdated)
+            .errorMessage(errorMessage)
+            .remoteHost(remoteHost)
+            .remoteAddr(remoteAddr)
+            .state(state)
+            .addAllHeadersForRecipients(perRecipientHeaders)
+            .attributes(toAttributes(rawAttributes));
+
+        return new MailDTO(mailBuilder,
+            blobIdFactory.from(row.getString(HEADER_BLOB_ID)),
+            blobIdFactory.from(row.getString(BODY_BLOB_ID)));
+    }
+
+    private Map<String, Serializable> toAttributes(Map<String, ByteBuffer> rowAttributes) {
+        return rowAttributes.entrySet()
+            .stream()
+            .map(entry -> Pair.of(entry.getKey(), fromByteBuffer(entry.getValue())))
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
+    }
+
+    private ImmutableList<String> asStringList(Collection<MailAddress> mailAddresses) {
+        return mailAddresses.stream().map(MailAddress::asString).collect(Guavate.toImmutableList());
+    }
+
+    private ImmutableMap<String, ByteBuffer> toRawAttributeMap(Mail mail) {
+        return Iterators.toStream(mail.getAttributeNames())
+            .map(name -> Pair.of(name, mail.getAttribute(name)))
+            .map(pair -> Pair.of(pair.getLeft(), toByteBuffer(pair.getRight())))
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
+    }
+
+    private ImmutableMap<String, UDTValue> toHeaderMap(PerRecipientHeaders perRecipientHeaders) {
+        return perRecipientHeaders.getHeadersByRecipient()
+            .asMap()
+            .entrySet()
+            .stream()
+            .flatMap(entry -> entry.getValue().stream().map(value -> Pair.of(entry.getKey(), value)))
+            .map(entry -> Pair.of(entry.getKey().asString(),
+                cassandraTypesProvider.getDefinedUserType(HEADER_TYPE)
+                    .newValue()
+                    .setString(HEADER_NAME, entry.getRight().getName())
+                    .setString(HEADER_VALUE, entry.getRight().getValue())))
+            .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight));
+    }
+
+    private PerRecipientHeaders fromHeaderMap(Map<String, UDTValue> rawMap) {
+        PerRecipientHeaders result = new PerRecipientHeaders();
+
+        rawMap.forEach((key, value) -> result.addHeaderForRecipient(PerRecipientHeaders.Header.builder()
+                .name(value.getString(HEADER_NAME))
+                .value(value.getString(HEADER_VALUE))
+                .build(),
+            toMailAddress(key)));
+        return result;
+    }
+
+    private ByteBuffer toByteBuffer(Serializable serializable) {
+        try {
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            new ObjectOutputStream(outputStream).writeObject(serializable);
+            return ByteBuffer.wrap(outputStream.toByteArray());
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private Serializable fromByteBuffer(ByteBuffer byteBuffer) {
+        try {
+            byte[] data = new byte[byteBuffer.remaining()];
+            byteBuffer.get(data);
+            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
+            return (Serializable) objectInputStream.readObject();
+        } catch (IOException | ClassNotFoundException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    private MailAddress toMailAddress(String rawValue) {
+        try {
+            return new MailAddress(rawValue);
+        } catch (AddressException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public class MailDTO {
+        private final MailImpl.Builder mailBuilder;
+        private final BlobId headerBlobId;
+        private final BlobId bodyBlobId;
+
+        public MailDTO(MailImpl.Builder mailBuilder, BlobId headerBlobId, BlobId bodyBlobId) {
+            this.mailBuilder = mailBuilder;
+            this.headerBlobId = headerBlobId;
+            this.bodyBlobId = bodyBlobId;
+        }
+
+        public MailImpl.Builder getMailBuilder() {
+            return mailBuilder;
+        }
+
+        public BlobId getHeaderBlobId() {
+            return headerBlobId;
+        }
+
+        public BlobId getBodyBlobId() {
+            return bodyBlobId;
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (o instanceof MailDTO) {
+                MailDTO mailDTO = (MailDTO) o;
+
+                return Objects.equals(this.mailBuilder.build(), mailDTO.mailBuilder.build())
+                    && Objects.equals(this.headerBlobId, mailDTO.headerBlobId)
+                    && Objects.equals(this.bodyBlobId, mailDTO.bodyBlobId);
+            }
+            return false;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Objects.hash(mailBuilder.build(), headerBlobId, bodyBlobId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0806d784/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
new file mode 100644
index 0000000..056c1dd
--- /dev/null
+++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java
@@ -0,0 +1,158 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailrepository.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertAll;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.DockerCassandraExtension;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.TestBlobId;
+import org.apache.mailet.Mail;
+import org.apache.mailet.PerRecipientHeaders;
+import org.apache.mailet.base.MailAddressFixture;
+import org.apache.mailet.base.test.FakeMail;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+@ExtendWith(DockerCassandraExtension.class)
+public class CassandraMailRepositoryMailDAOTest {
+
+    static final String URL = "url";
+    static final String KEY_1 = "key1";
+    static final TestBlobId.Factory BLOB_ID_FACTORY = new TestBlobId.Factory();
+
+    CassandraCluster cassandra;
+    CassandraMailRepositoryMailDAO testee;
+
+    @BeforeEach
+    public void setUp(DockerCassandraExtension.DockerCassandra dockerCassandra) {
+        cassandra = CassandraCluster.create(
+            new CassandraMailRepositoryModule(), dockerCassandra.getIp(), dockerCassandra.getBindingPort());
+
+        testee = new CassandraMailRepositoryMailDAO(cassandra.getConf(), BLOB_ID_FACTORY, cassandra.getTypesProvider());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        cassandra.close();
+    }
+
+    @Test
+    public void readShouldReturnEmptyWhenAbsent() {
+        assertThat(testee.read(URL, KEY_1).join())
+            .isEmpty();
+    }
+
+    @Test
+    public void readShouldReturnAllMailMetadata() throws Exception {
+        BlobId blobIdBody = BLOB_ID_FACTORY.from("blobHeader");
+        BlobId blobIdHeader = BLOB_ID_FACTORY.from("blobBody");
+        String errorMessage = "error message";
+        String state = "state";
+        String remoteAddr = "remoteAddr";
+        String remoteHost = "remoteHost";
+        PerRecipientHeaders.Header header = PerRecipientHeaders.Header.builder().name("headerName").value("headerValue").build();
+        String attributeName = "att1";
+        ImmutableList<String> attributeValue = ImmutableList.of("value1", "value2");
+
+        testee.store(URL,
+            FakeMail.builder()
+                .name(KEY_1)
+                .sender(MailAddressFixture.SENDER)
+                .recipients(MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2)
+                .errorMessage(errorMessage)
+                .state(state)
+                .remoteAddr(remoteAddr)
+                .remoteHost(remoteHost)
+                .addHeaderForRecipient(header, MailAddressFixture.RECIPIENT1)
+                .attribute(attributeName, attributeValue)
+                .build(),
+            blobIdHeader,
+            blobIdBody)
+            .join();
+
+        CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get();
+
+        Mail partialMail = mailDTO.getMailBuilder().build();
+        assertAll(
+            () -> assertThat(mailDTO.getBodyBlobId()).isEqualTo(blobIdBody),
+            () -> assertThat(mailDTO.getHeaderBlobId()).isEqualTo(blobIdHeader),
+            () -> assertThat(partialMail.getName()).isEqualTo(KEY_1),
+            () -> assertThat(partialMail.getErrorMessage()).isEqualTo(errorMessage),
+            () -> assertThat(partialMail.getState()).isEqualTo(state),
+            () -> assertThat(partialMail.getRemoteAddr()).isEqualTo(remoteAddr),
+            () -> assertThat(partialMail.getRemoteHost()).isEqualTo(remoteHost),
+            () -> assertThat(partialMail.getAttributeNames()).containsOnly(attributeName),
+            () -> assertThat(partialMail.getAttribute(attributeName)).isEqualTo(attributeValue),
+            () -> assertThat(partialMail.getPerRecipientSpecificHeaders().getRecipientsWithSpecificHeaders())
+                .containsOnly(MailAddressFixture.RECIPIENT1),
+            () -> assertThat(partialMail.getPerRecipientSpecificHeaders().getHeadersForRecipient(MailAddressFixture.RECIPIENT1))
+                .containsOnly(header),
+            () -> assertThat(partialMail.getSender()).isEqualTo(MailAddressFixture.SENDER),
+            () -> assertThat(partialMail.getRecipients()).containsOnly(MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2));
+    }
+
+    @Test
+    public void storeShouldAcceptMailWithOnlyName() throws Exception {
+        BlobId blobIdBody = BLOB_ID_FACTORY.from("blobHeader");
+        BlobId blobIdHeader = BLOB_ID_FACTORY.from("blobBody");
+
+        testee.store(URL,
+            FakeMail.builder()
+                .name(KEY_1)
+                .build(),
+            blobIdHeader,
+            blobIdBody)
+            .join();
+
+        CassandraMailRepositoryMailDAO.MailDTO mailDTO = testee.read(URL, KEY_1).join().get();
+
+        Mail partialMail = mailDTO.getMailBuilder().build();
+        assertAll(
+            () -> assertThat(mailDTO.getBodyBlobId()).isEqualTo(blobIdBody),
+            () -> assertThat(mailDTO.getHeaderBlobId()).isEqualTo(blobIdHeader),
+            () -> assertThat(partialMail.getName()).isEqualTo(KEY_1));
+    }
+
+    @Test
+    public void removeShouldDeleteMailMetaData() throws Exception {
+        BlobId blobIdBody = BLOB_ID_FACTORY.from("blobHeader");
+        BlobId blobIdHeader = BLOB_ID_FACTORY.from("blobBody");
+
+        testee.store(URL,
+            FakeMail.builder()
+                .name(KEY_1)
+                .build(),
+            blobIdHeader,
+            blobIdBody)
+            .join();
+
+        testee.remove(URL, KEY_1);
+
+        assertThat(testee.read(URL, KEY_1).join())
+            .isEmpty();
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org