You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/28 03:20:43 UTC
[james-project] 15/25: JAMES-3319 Actual Message & Attachment
deletion for mailbox/cassandra
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f9a847739145a3baf11ba7aced4ba06157561295
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 24 16:52:27 2020 +0700
JAMES-3319 Actual Message & Attachment deletion for mailbox/cassandra
---
.../CassandraMailboxSessionMapperFactory.java | 2 +-
.../mailbox/cassandra/DeleteMessageListener.java | 20 ++++++++++--
.../cassandra/mail/CassandraMessageDAO.java | 38 ++++++++++++----------
.../cassandra/mail/MessageRepresentation.java | 15 ++++++++-
.../cassandra/CassandraMailboxManagerTest.java | 24 ++++++++++++++
5 files changed, 77 insertions(+), 22 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index def8f71..e51364c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -206,6 +206,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
public DeleteMessageListener deleteMessageListener() {
return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, attachmentDAOV2, ownerDAO,
attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
- mailboxCounterDAO, mailboxRecentsDAO);
+ mailboxCounterDAO, mailboxRecentsDAO, blobStore);
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index e7e3627..d4a3ea7 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -26,6 +26,7 @@ import java.util.function.Predicate;
import javax.inject.Inject;
+import org.apache.james.blob.api.BlobStore;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
@@ -88,13 +89,14 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
private final CassandraDeletedMessageDAO deletedMessageDAO;
private final CassandraMailboxCounterDAO counterDAO;
private final CassandraMailboxRecentsDAO recentsDAO;
+ private final BlobStore blobStore;
@Inject
public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
- CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO) {
+ CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) {
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
this.messageDAO = messageDAO;
@@ -108,6 +110,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
this.deletedMessageDAO = deletedMessageDAO;
this.counterDAO = counterDAO;
this.recentsDAO = recentsDAO;
+ this.blobStore = blobStore;
}
@Override
@@ -172,6 +175,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
.filterWhen(this::isReferenced)
.flatMap(id -> readMessage(id)
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
+ .flatMap(this::deleteMessageBlobs)
.flatMap(this::deleteAttachmentMessageIds)
.then(messageDAO.delete(messageId)));
}
@@ -181,10 +185,19 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
.filterWhen(id -> isReferenced(id, excludedId))
.flatMap(id -> readMessage(id)
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
+ .flatMap(this::deleteMessageBlobs)
.flatMap(this::deleteAttachmentMessageIds)
.then(messageDAO.delete(messageId)));
}
+ private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) {
+ return Flux.merge(
+ blobStore.delete(blobStore.getDefaultBucketName(), message.getHeaderId()),
+ blobStore.delete(blobStore.getDefaultBucketName(), message.getBodyId()))
+ .then()
+ .thenReturn(message);
+ }
+
private Mono<MessageRepresentation> readMessage(CassandraMessageId id) {
return messageDAO.retrieveMessage(id, MessageMapper.FetchType.Metadata);
}
@@ -193,7 +206,10 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
return Flux.fromIterable(message.getAttachments())
.filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()))
.filterWhen(attachment -> hasOtherMessagesReferences(message, attachment))
- .concatMap(attachment -> attachmentDAO.delete(attachment.getAttachmentId()))
+ .concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId())
+ .map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId)
+ .flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)))
+ .then(attachmentDAO.delete(attachment.getAttachmentId())))
.then();
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index a61d01f..7e7f06b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -245,7 +245,11 @@ public class CassandraMessageDAO {
}
Row row = rows.one();
- return buildContentRetriever(fetchType, row).map(content ->
+ BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
+ BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
+ int bodyStartOctet = row.getInt(BODY_START_OCTET);
+
+ return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet).map(content ->
new MessageRepresentation(
cassandraMessageId,
row.getTimestamp(INTERNAL_DATE),
@@ -253,7 +257,9 @@ public class CassandraMessageDAO {
row.getInt(BODY_START_OCTET),
new SharedByteArrayInputStream(content),
getPropertyBuilder(row),
- getAttachments(row).collect(Guavate.toImmutableList())));
+ getAttachments(row).collect(Guavate.toImmutableList()),
+ headerId,
+ bodyId));
}
private PropertyBuilder getPropertyBuilder(Row row) {
@@ -293,15 +299,15 @@ public class CassandraMessageDAO {
.setUUID(MESSAGE_ID, messageId.get()));
}
- private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) {
+ private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
switch (fetchType) {
case Full:
- return getFullContent(row);
+ return getFullContent(headerId, bodyId);
case Headers:
- return getHeaderContent(row);
+ return getContent(headerId);
case Body:
- return getBodyContent(row)
- .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+ return getContent(bodyId)
+ .map(data -> Bytes.concat(new byte[bodyStartOctet], data));
case Metadata:
return Mono.just(EMPTY_BYTE_ARRAY);
default:
@@ -309,20 +315,16 @@ public class CassandraMessageDAO {
}
}
- private Mono<byte[]> getFullContent(Row row) {
- return getHeaderContent(row)
- .zipWith(getBodyContent(row), Bytes::concat);
- }
-
- private Mono<byte[]> getBodyContent(Row row) {
- return getFieldContent(BODY_CONTENT, row);
+ private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
+ return getContent(headerId)
+ .zipWith(getContent(bodyId), Bytes::concat);
}
- private Mono<byte[]> getHeaderContent(Row row) {
- return getFieldContent(HEADER_CONTENT, row);
+ private Mono<byte[]> getContent(BlobId blobId) {
+ return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId));
}
- private Mono<byte[]> getFieldContent(String field, Row row) {
- return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field))));
+ private BlobId retrieveBlobId(String field, Row row) {
+ return blobIdFactory.from(row.getString(field));
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
index 7a9a0b1..7ac496b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
@@ -24,6 +24,7 @@ import java.util.List;
import javax.mail.util.SharedByteArrayInputStream;
+import org.apache.james.blob.api.BlobId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
import org.apache.james.mailbox.model.MessageAttachmentMetadata;
import org.apache.james.mailbox.model.MessageId;
@@ -38,9 +39,11 @@ public class MessageRepresentation {
private final SharedByteArrayInputStream content;
private final PropertyBuilder propertyBuilder;
private final List<MessageAttachmentRepresentation> attachments;
+ private final BlobId headerId;
+ private final BlobId bodyId;
public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
- PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments) {
+ PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) {
this.messageId = messageId;
this.internalDate = internalDate;
this.size = size;
@@ -48,6 +51,8 @@ public class MessageRepresentation {
this.content = content;
this.propertyBuilder = propertyBuilder;
this.attachments = attachments;
+ this.headerId = headerId;
+ this.bodyId = bodyId;
}
public SimpleMailboxMessage toMailboxMessage(ComposedMessageIdWithMetaData metadata, List<MessageAttachmentMetadata> attachments) {
@@ -81,4 +86,12 @@ public class MessageRepresentation {
public List<MessageAttachmentRepresentation> getAttachments() {
return attachments;
}
+
+ public BlobId getHeaderId() {
+ return headerId;
+ }
+
+ public BlobId getBodyId() {
+ return bodyId;
+ }
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 328fcc6..f2a98f2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.mailbox.cassandra;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.BlobTables;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxManagerTest;
import org.apache.james.mailbox.MailboxSession;
@@ -160,6 +162,28 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai
}
@Test
+ void deleteMailboxShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception {
+ inboxManager.appendMessage(MessageManager.AppendCommand.builder()
+ .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);
+
+ mailboxManager.deleteMailbox(inbox, session);
+
+ assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
+ void deleteMessageShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception {
+ AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder()
+ .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);
+
+ inboxManager.delete(ImmutableList.of(appendResult.getId().getUid()), session);
+
+ assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
void deleteMailboxShouldEventuallyUnreferenceMessageMetadataWhenDeleteAttachmentFails(CassandraCluster cassandraCluster) throws Exception {
AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder()
.build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org