You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/30 08:27:13 UTC

[james-project] 01/08: JAMES-3319 Effective blob deletion for RabbitMQMailQueue

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d69b48f1276910d8e3cdf13e677fcafc818b65f7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jul 27 16:14:03 2020 +0700

    JAMES-3319 Effective blob deletion for RabbitMQMailQueue
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  2 +-
 .../apache/james/queue/rabbitmq/MailLoader.java    |  5 +-
 .../james/queue/rabbitmq/MailWithEnqueueId.java    |  9 ++-
 .../queue/rabbitmq/view/api/DeleteCondition.java   | 13 +++-
 .../view/cassandra/CassandraMailQueueView.java     | 25 +++++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 77 ++++++++++++++++++++++
 .../rabbitmq/view/api/DeleteConditionTest.java     |  6 +-
 .../CassandraMailQueueViewTestFactory.java         |  4 +-
 8 files changed, 123 insertions(+), 18 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b477335..0241c46 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -132,7 +132,7 @@ class Dequeuer implements Closeable {
             if (success) {
                 dequeueMetric.increment();
                 response.ack();
-                mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId()));
+                mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()));
             } else {
                 response.nack(REQUEUE);
             }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index b724e60..ffe765b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -47,7 +47,10 @@ class MailLoader {
     Mono<MailWithEnqueueId> load(MailReferenceDTO dto) {
         return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory))
             .flatMap(mailReference -> buildMail(mailReference)
-                .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail)));
+                .map(mail -> new MailWithEnqueueId(
+                    mailReference.getEnqueueId(),
+                    mail,
+                    mailReference.getPartsId())));
     }
 
     private Mono<Mail> buildMail(MailReference mailReference) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
index f256fbf..a8edc9c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
@@ -19,15 +19,18 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.mailet.Mail;
 
 public class MailWithEnqueueId {
     private final EnqueueId enqueueId;
     private final Mail mail;
+    private final MimeMessagePartsId blobIds;
 
-    MailWithEnqueueId(EnqueueId enqueueId, Mail mail) {
+    MailWithEnqueueId(EnqueueId enqueueId, Mail mail, MimeMessagePartsId blobIds) {
         this.enqueueId = enqueueId;
         this.mail = mail;
+        this.blobIds = blobIds;
     }
 
     public EnqueueId getEnqueueId() {
@@ -37,4 +40,8 @@ public class MailWithEnqueueId {
     public Mail getMail() {
         return mail;
     }
+
+    public MimeMessagePartsId getBlobIds() {
+        return blobIds;
+    }
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
index 12aba7b..d0e8d04 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq.view.api;
 import java.util.Objects;
 
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
@@ -114,15 +115,21 @@ public interface DeleteCondition {
 
     class WithEnqueueId implements DeleteCondition {
         private final EnqueueId enqueueId;
+        private final MimeMessagePartsId blobIds;
 
-        WithEnqueueId(EnqueueId enqueueId) {
+        WithEnqueueId(EnqueueId enqueueId, MimeMessagePartsId blobIds) {
             this.enqueueId = enqueueId;
+            this.blobIds = blobIds;
         }
 
         public EnqueueId getEnqueueId() {
             return enqueueId;
         }
 
+        public MimeMessagePartsId getBlobIds() {
+            return blobIds;
+        }
+
         @Override
         public boolean shouldBeDeleted(EnqueuedItem enqueuedItem) {
             Preconditions.checkNotNull(enqueuedItem);
@@ -191,9 +198,9 @@ public interface DeleteCondition {
         return new WithName(value);
     }
 
-    static WithEnqueueId withEnqueueId(EnqueueId value) {
+    static WithEnqueueId withEnqueueId(EnqueueId value, MimeMessagePartsId blobIds) {
         Preconditions.checkNotNull(value);
-        return new WithEnqueueId(value);
+        return new WithEnqueueId(value, blobIds);
     }
 
     static DeleteCondition all() {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 72b26a3..4a9bad8 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -24,7 +24,11 @@ import static org.apache.james.util.FunctionalUtils.negate;
 import java.time.Instant;
 
 import javax.inject.Inject;
+import javax.mail.internet.MimeMessage;
 
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
@@ -45,40 +49,46 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
         private final CassandraMailQueueMailStore storeHelper;
         private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
         private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
+        private final MimeMessageStore.Factory mimeMessageStoreFactory;
 
         @Inject
         public Factory(CassandraMailQueueMailStore storeHelper,
                        CassandraMailQueueBrowser cassandraMailQueueBrowser,
                        CassandraMailQueueMailDelete cassandraMailQueueMailDelete,
                        EventsourcingConfigurationManagement eventsourcingConfigurationManagement,
+                       MimeMessageStore.Factory mimeMessageStoreFactory,
                        CassandraMailQueueViewConfiguration configuration) {
             this.storeHelper = storeHelper;
             this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
             this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+            this.mimeMessageStoreFactory = mimeMessageStoreFactory;
 
             eventsourcingConfigurationManagement.registerConfiguration(configuration);
         }
 
         @Override
         public MailQueueView create(MailQueueName mailQueueName) {
-            return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete);
+            return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete,
+                mimeMessageStoreFactory.mimeMessageStore());
         }
     }
 
     private final CassandraMailQueueMailStore storeHelper;
     private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
     private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
+    private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
 
     private final MailQueueName mailQueueName;
 
     CassandraMailQueueView(CassandraMailQueueMailStore storeHelper,
                            MailQueueName mailQueueName,
                            CassandraMailQueueBrowser cassandraMailQueueBrowser,
-                           CassandraMailQueueMailDelete cassandraMailQueueMailDelete) {
+                           CassandraMailQueueMailDelete cassandraMailQueueMailDelete, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore) {
         this.mailQueueName = mailQueueName;
         this.storeHelper = storeHelper;
         this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
         this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+        this.mimeMessageStore = mimeMessageStore;
     }
 
     @Override
@@ -123,7 +133,7 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
     public long delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
             DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition;
-            delete(enqueueIdCondition.getEnqueueId()).block();
+            delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block();
             return 1L;
         }
         return browseThenDelete(deleteCondition);
@@ -133,15 +143,18 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(deleteCondition::shouldBeDeleted)
-            .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName))
+            .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)
+                .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))))
             .count()
             .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
             .subscribeOn(Schedulers.elastic())
             .block();
     }
 
-    private Mono<Void> delete(EnqueueId enqueueId) {
-        return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName);
+    private Mono<Void> delete(EnqueueId enqueueId,
+                              MimeMessagePartsId blobIds) {
+        return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName)
+            .then(Mono.from(mimeMessageStore.delete(blobIds)));
     }
 
     @Override
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index a8b044d..2da8199 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -19,12 +19,16 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static java.time.temporal.ChronoUnit.HOURS;
 import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.queue.api.Mails.defaultMail;
+import static org.apache.james.queue.api.Mails.defaultMailNoRecipient;
+import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
+import static org.apache.mailet.base.MailAddressFixture.SENDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.mockito.ArgumentMatchers.any;
@@ -48,6 +52,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.BlobTables;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
 import org.apache.james.blob.mail.MimeMessageStore;
@@ -175,6 +180,78 @@ class RabbitMQMailQueueTest {
         }
 
         @Test
+        void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            dequeueFlux.take(1)
+                .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
+                    mailQueueItem.done(true);
+                    return mailQueueItem;
+                })).blockLast(Duration.ofSeconds(10));
+
+            assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
+        void clearShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getManageableMailQueue().clear();
+
+            assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
+        void removeByNameShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getManageableMailQueue().remove(ManageableMailQueue.Type.Name, name1);
+
+            assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
+        void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            getMailQueue().enQueue(defaultMailNoRecipient()
+                .name(name1)
+                .recipient(RECIPIENT1)
+                .build());
+
+            getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT1.asString());
+
+            assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
+        void removeBySenderShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .sender(SENDER)
+                .build());
+
+            getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, SENDER.asString());
+
+            assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+                .isEmpty();
+        }
+
+        @Test
         void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
             ManageableMailQueue mailQueue = getManageableMailQueue();
             int emailCount = 5;
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
index ddcd5da..2b800ed 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
@@ -80,7 +80,7 @@ class DeleteConditionTest {
         @Test
         void withSenderShouldThrowOnNullCondition() {
             assertThatThrownBy(() ->
-                DeleteCondition.withEnqueueId(null))
+                DeleteCondition.withEnqueueId(null, null))
                 .isInstanceOf(NullPointerException.class);
         }
 
@@ -97,7 +97,7 @@ class DeleteConditionTest {
                 .mimeMessagePartsId(MESSAGE_PARTS_ID)
                 .build();
 
-            assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem))
+            assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem))
                 .isTrue();
         }
 
@@ -114,7 +114,7 @@ class DeleteConditionTest {
                 .mimeMessagePartsId(MESSAGE_PARTS_ID)
                 .build();
 
-            assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem))
+            assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem))
                 .isFalse();
         }
     }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 1caf7aa..0d611a8 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,12 +25,9 @@ import java.util.Optional;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
-import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
 import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -66,6 +63,7 @@ public class CassandraMailQueueViewTestFactory {
             cassandraMailQueueBrowser,
             cassandraMailQueueMailDelete,
             eventsourcingConfigurationManagement,
+            mimeMessageStoreFactory,
             configuration);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org