You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/28 03:20:43 UTC

[james-project] 15/25: JAMES-3319 Actual Message & Attachment deletion for mailbox/cassandra

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f9a847739145a3baf11ba7aced4ba06157561295
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 24 16:52:27 2020 +0700

    JAMES-3319 Actual Message & Attachment deletion for mailbox/cassandra
---
 .../CassandraMailboxSessionMapperFactory.java      |  2 +-
 .../mailbox/cassandra/DeleteMessageListener.java   | 20 ++++++++++--
 .../cassandra/mail/CassandraMessageDAO.java        | 38 ++++++++++++----------
 .../cassandra/mail/MessageRepresentation.java      | 15 ++++++++-
 .../cassandra/CassandraMailboxManagerTest.java     | 24 ++++++++++++++
 5 files changed, 77 insertions(+), 22 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index def8f71..e51364c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -206,6 +206,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     public DeleteMessageListener deleteMessageListener() {
         return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, attachmentDAOV2, ownerDAO,
             attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
-            mailboxCounterDAO, mailboxRecentsDAO);
+            mailboxCounterDAO, mailboxRecentsDAO, blobStore);
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index e7e3627..d4a3ea7 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -26,6 +26,7 @@ import java.util.function.Predicate;
 
 import javax.inject.Inject;
 
+import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
@@ -88,13 +89,14 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
     private final CassandraDeletedMessageDAO deletedMessageDAO;
     private final CassandraMailboxCounterDAO counterDAO;
     private final CassandraMailboxRecentsDAO recentsDAO;
+    private final BlobStore blobStore;
 
     @Inject
     public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                  CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
                                  CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
                                  CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
-                                 CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO) {
+                                 CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) {
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
         this.messageDAO = messageDAO;
@@ -108,6 +110,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
         this.deletedMessageDAO = deletedMessageDAO;
         this.counterDAO = counterDAO;
         this.recentsDAO = recentsDAO;
+        this.blobStore = blobStore;
     }
 
     @Override
@@ -172,6 +175,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
             .filterWhen(this::isReferenced)
             .flatMap(id -> readMessage(id)
                 .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
+                .flatMap(this::deleteMessageBlobs)
                 .flatMap(this::deleteAttachmentMessageIds)
                 .then(messageDAO.delete(messageId)));
     }
@@ -181,10 +185,19 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
             .filterWhen(id -> isReferenced(id, excludedId))
             .flatMap(id -> readMessage(id)
                 .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
+                .flatMap(this::deleteMessageBlobs)
                 .flatMap(this::deleteAttachmentMessageIds)
                 .then(messageDAO.delete(messageId)));
     }
 
+    private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) {
+        return Flux.merge(
+                blobStore.delete(blobStore.getDefaultBucketName(), message.getHeaderId()),
+                blobStore.delete(blobStore.getDefaultBucketName(), message.getBodyId()))
+            .then()
+            .thenReturn(message);
+    }
+
     private Mono<MessageRepresentation> readMessage(CassandraMessageId id) {
         return messageDAO.retrieveMessage(id, MessageMapper.FetchType.Metadata);
     }
@@ -193,7 +206,10 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
         return Flux.fromIterable(message.getAttachments())
             .filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate()))
             .filterWhen(attachment -> hasOtherMessagesReferences(message, attachment))
-            .concatMap(attachment -> attachmentDAO.delete(attachment.getAttachmentId()))
+            .concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId())
+                .map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId)
+                .flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)))
+                .then(attachmentDAO.delete(attachment.getAttachmentId())))
             .then();
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index a61d01f..7e7f06b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -245,7 +245,11 @@ public class CassandraMessageDAO {
         }
 
         Row row = rows.one();
-        return buildContentRetriever(fetchType, row).map(content ->
+        BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
+        BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
+        int bodyStartOctet = row.getInt(BODY_START_OCTET);
+
+        return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet).map(content ->
             new MessageRepresentation(
                 cassandraMessageId,
                 row.getTimestamp(INTERNAL_DATE),
@@ -253,7 +257,9 @@ public class CassandraMessageDAO {
                 row.getInt(BODY_START_OCTET),
                 new SharedByteArrayInputStream(content),
                 getPropertyBuilder(row),
-                getAttachments(row).collect(Guavate.toImmutableList())));
+                getAttachments(row).collect(Guavate.toImmutableList()),
+                headerId,
+                bodyId));
     }
 
     private PropertyBuilder getPropertyBuilder(Row row) {
@@ -293,15 +299,15 @@ public class CassandraMessageDAO {
             .setUUID(MESSAGE_ID, messageId.get()));
     }
 
-    private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) {
+    private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) {
         switch (fetchType) {
             case Full:
-                return getFullContent(row);
+                return getFullContent(headerId, bodyId);
             case Headers:
-                return getHeaderContent(row);
+                return getContent(headerId);
             case Body:
-                return getBodyContent(row)
-                    .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data));
+                return getContent(bodyId)
+                    .map(data -> Bytes.concat(new byte[bodyStartOctet], data));
             case Metadata:
                 return Mono.just(EMPTY_BYTE_ARRAY);
             default:
@@ -309,20 +315,16 @@ public class CassandraMessageDAO {
         }
     }
 
-    private Mono<byte[]> getFullContent(Row row) {
-        return getHeaderContent(row)
-            .zipWith(getBodyContent(row), Bytes::concat);
-    }
-
-    private Mono<byte[]> getBodyContent(Row row) {
-        return getFieldContent(BODY_CONTENT, row);
+    private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
+        return getContent(headerId)
+            .zipWith(getContent(bodyId), Bytes::concat);
     }
 
-    private Mono<byte[]> getHeaderContent(Row row) {
-        return getFieldContent(HEADER_CONTENT, row);
+    private Mono<byte[]> getContent(BlobId blobId) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId));
     }
 
-    private Mono<byte[]> getFieldContent(String field, Row row) {
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field))));
+    private BlobId retrieveBlobId(String field, Row row) {
+        return blobIdFactory.from(row.getString(field));
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
index 7a9a0b1..7ac496b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import javax.mail.util.SharedByteArrayInputStream;
 
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageAttachmentMetadata;
 import org.apache.james.mailbox.model.MessageId;
@@ -38,9 +39,11 @@ public class MessageRepresentation {
     private final SharedByteArrayInputStream content;
     private final PropertyBuilder propertyBuilder;
     private final List<MessageAttachmentRepresentation> attachments;
+    private final BlobId headerId;
+    private final BlobId bodyId;
 
     public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content,
-                                 PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments) {
+                                 PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) {
         this.messageId = messageId;
         this.internalDate = internalDate;
         this.size = size;
@@ -48,6 +51,8 @@ public class MessageRepresentation {
         this.content = content;
         this.propertyBuilder = propertyBuilder;
         this.attachments = attachments;
+        this.headerId = headerId;
+        this.bodyId = bodyId;
     }
 
     public SimpleMailboxMessage toMailboxMessage(ComposedMessageIdWithMetaData metadata, List<MessageAttachmentMetadata> attachments) {
@@ -81,4 +86,12 @@ public class MessageRepresentation {
     public List<MessageAttachmentRepresentation> getAttachments() {
         return attachments;
     }
+
+    public BlobId getHeaderId() {
+        return headerId;
+    }
+
+    public BlobId getBodyId() {
+        return bodyId;
+    }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 328fcc6..f2a98f2 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.BlobTables;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxManagerTest;
 import org.apache.james.mailbox.MailboxSession;
@@ -160,6 +162,28 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai
         }
 
         @Test
+        void deleteMailboxShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception {
+            inboxManager.appendMessage(MessageManager.AppendCommand.builder()
+                .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);
+
+            mailboxManager.deleteMailbox(inbox, session);
+
+            assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
+        void deleteMessageShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception {
+            AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder()
+                .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);
+
+            inboxManager.delete(ImmutableList.of(appendResult.getId().getUid()), session);
+
+            assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
         void deleteMailboxShouldEventuallyUnreferenceMessageMetadataWhenDeleteAttachmentFails(CassandraCluster cassandraCluster) throws Exception {
             AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder()
                 .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org