You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/07/30 08:27:13 UTC
[james-project] 01/08: JAMES-3319 Effective blob deletion for
RabbitMQMailQueue
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d69b48f1276910d8e3cdf13e677fcafc818b65f7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jul 27 16:14:03 2020 +0700
JAMES-3319 Effective blob deletion for RabbitMQMailQueue
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 2 +-
.../apache/james/queue/rabbitmq/MailLoader.java | 5 +-
.../james/queue/rabbitmq/MailWithEnqueueId.java | 9 ++-
.../queue/rabbitmq/view/api/DeleteCondition.java | 13 +++-
.../view/cassandra/CassandraMailQueueView.java | 25 +++++--
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 77 ++++++++++++++++++++++
.../rabbitmq/view/api/DeleteConditionTest.java | 6 +-
.../CassandraMailQueueViewTestFactory.java | 4 +-
8 files changed, 123 insertions(+), 18 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b477335..0241c46 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -132,7 +132,7 @@ class Dequeuer implements Closeable {
if (success) {
dequeueMetric.increment();
response.ack();
- mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId()));
+ mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()));
} else {
response.nack(REQUEUE);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index b724e60..ffe765b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -47,7 +47,10 @@ class MailLoader {
Mono<MailWithEnqueueId> load(MailReferenceDTO dto) {
return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory))
.flatMap(mailReference -> buildMail(mailReference)
- .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail)));
+ .map(mail -> new MailWithEnqueueId(
+ mailReference.getEnqueueId(),
+ mail,
+ mailReference.getPartsId())));
}
private Mono<Mail> buildMail(MailReference mailReference) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
index f256fbf..a8edc9c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java
@@ -19,15 +19,18 @@
package org.apache.james.queue.rabbitmq;
+import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.mailet.Mail;
public class MailWithEnqueueId {
private final EnqueueId enqueueId;
private final Mail mail;
+ private final MimeMessagePartsId blobIds;
- MailWithEnqueueId(EnqueueId enqueueId, Mail mail) {
+ MailWithEnqueueId(EnqueueId enqueueId, Mail mail, MimeMessagePartsId blobIds) {
this.enqueueId = enqueueId;
this.mail = mail;
+ this.blobIds = blobIds;
}
public EnqueueId getEnqueueId() {
@@ -37,4 +40,8 @@ public class MailWithEnqueueId {
public Mail getMail() {
return mail;
}
+
+ public MimeMessagePartsId getBlobIds() {
+ return blobIds;
+ }
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
index 12aba7b..d0e8d04 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq.view.api;
import java.util.Objects;
import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
@@ -114,15 +115,21 @@ public interface DeleteCondition {
class WithEnqueueId implements DeleteCondition {
private final EnqueueId enqueueId;
+ private final MimeMessagePartsId blobIds;
- WithEnqueueId(EnqueueId enqueueId) {
+ WithEnqueueId(EnqueueId enqueueId, MimeMessagePartsId blobIds) {
this.enqueueId = enqueueId;
+ this.blobIds = blobIds;
}
public EnqueueId getEnqueueId() {
return enqueueId;
}
+ public MimeMessagePartsId getBlobIds() {
+ return blobIds;
+ }
+
@Override
public boolean shouldBeDeleted(EnqueuedItem enqueuedItem) {
Preconditions.checkNotNull(enqueuedItem);
@@ -191,9 +198,9 @@ public interface DeleteCondition {
return new WithName(value);
}
- static WithEnqueueId withEnqueueId(EnqueueId value) {
+ static WithEnqueueId withEnqueueId(EnqueueId value, MimeMessagePartsId blobIds) {
Preconditions.checkNotNull(value);
- return new WithEnqueueId(value);
+ return new WithEnqueueId(value, blobIds);
}
static DeleteCondition all() {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index 72b26a3..4a9bad8 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -24,7 +24,11 @@ import static org.apache.james.util.FunctionalUtils.negate;
import java.time.Instant;
import javax.inject.Inject;
+import javax.mail.internet.MimeMessage;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
@@ -45,40 +49,46 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
private final CassandraMailQueueMailStore storeHelper;
private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
+ private final MimeMessageStore.Factory mimeMessageStoreFactory;
@Inject
public Factory(CassandraMailQueueMailStore storeHelper,
CassandraMailQueueBrowser cassandraMailQueueBrowser,
CassandraMailQueueMailDelete cassandraMailQueueMailDelete,
EventsourcingConfigurationManagement eventsourcingConfigurationManagement,
+ MimeMessageStore.Factory mimeMessageStoreFactory,
CassandraMailQueueViewConfiguration configuration) {
this.storeHelper = storeHelper;
this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+ this.mimeMessageStoreFactory = mimeMessageStoreFactory;
eventsourcingConfigurationManagement.registerConfiguration(configuration);
}
@Override
public MailQueueView create(MailQueueName mailQueueName) {
- return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete);
+ return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete,
+ mimeMessageStoreFactory.mimeMessageStore());
}
}
private final CassandraMailQueueMailStore storeHelper;
private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete;
+ private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailQueueName mailQueueName;
CassandraMailQueueView(CassandraMailQueueMailStore storeHelper,
MailQueueName mailQueueName,
CassandraMailQueueBrowser cassandraMailQueueBrowser,
- CassandraMailQueueMailDelete cassandraMailQueueMailDelete) {
+ CassandraMailQueueMailDelete cassandraMailQueueMailDelete, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore) {
this.mailQueueName = mailQueueName;
this.storeHelper = storeHelper;
this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete;
+ this.mimeMessageStore = mimeMessageStore;
}
@Override
@@ -123,7 +133,7 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
public long delete(DeleteCondition deleteCondition) {
if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition;
- delete(enqueueIdCondition.getEnqueueId()).block();
+ delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block();
return 1L;
}
return browseThenDelete(deleteCondition);
@@ -133,15 +143,18 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
.filter(deleteCondition::shouldBeDeleted)
- .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName))
+ .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)
+ .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))))
.count()
.doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
.subscribeOn(Schedulers.elastic())
.block();
}
- private Mono<Void> delete(EnqueueId enqueueId) {
- return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName);
+ private Mono<Void> delete(EnqueueId enqueueId,
+ MimeMessagePartsId blobIds) {
+ return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName)
+ .then(Mono.from(mimeMessageStore.delete(blobIds)));
}
@Override
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index a8b044d..2da8199 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -19,12 +19,16 @@
package org.apache.james.queue.rabbitmq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static java.time.temporal.ChronoUnit.HOURS;
import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.queue.api.Mails.defaultMail;
+import static org.apache.james.queue.api.Mails.defaultMailNoRecipient;
+import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
+import static org.apache.mailet.base.MailAddressFixture.SENDER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.ArgumentMatchers.any;
@@ -48,6 +52,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.BlobTables;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
import org.apache.james.blob.mail.MimeMessageStore;
@@ -175,6 +180,78 @@ class RabbitMQMailQueueTest {
}
@Test
+ void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ dequeueFlux.take(1)
+ .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
+ mailQueueItem.done(true);
+ return mailQueueItem;
+ })).blockLast(Duration.ofSeconds(10));
+
+ assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
+ void clearShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getManageableMailQueue().clear();
+
+ assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
+ void removeByNameShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getManageableMailQueue().remove(ManageableMailQueue.Type.Name, name1);
+
+ assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
+ void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ getMailQueue().enQueue(defaultMailNoRecipient()
+ .name(name1)
+ .recipient(RECIPIENT1)
+ .build());
+
+ getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT1.asString());
+
+ assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
+ void removeBySenderShouldDeleteBlobs(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .sender(SENDER)
+ .build());
+
+ getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, SENDER.asString());
+
+ assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME)))
+ .isEmpty();
+ }
+
+ @Test
void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
ManageableMailQueue mailQueue = getManageableMailQueue();
int emailCount = 5;
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
index ddcd5da..2b800ed 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java
@@ -80,7 +80,7 @@ class DeleteConditionTest {
@Test
void withSenderShouldThrowOnNullCondition() {
assertThatThrownBy(() ->
- DeleteCondition.withEnqueueId(null))
+ DeleteCondition.withEnqueueId(null, null))
.isInstanceOf(NullPointerException.class);
}
@@ -97,7 +97,7 @@ class DeleteConditionTest {
.mimeMessagePartsId(MESSAGE_PARTS_ID)
.build();
- assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem))
+ assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem))
.isTrue();
}
@@ -114,7 +114,7 @@ class DeleteConditionTest {
.mimeMessagePartsId(MESSAGE_PARTS_ID)
.build();
- assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem))
+ assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem))
.isFalse();
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 1caf7aa..0d611a8 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -25,12 +25,9 @@ import java.util.Optional;
import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.mail.MimeMessageStore;
-import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
-import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule;
@@ -66,6 +63,7 @@ public class CassandraMailQueueViewTestFactory {
cassandraMailQueueBrowser,
cassandraMailQueueMailDelete,
eventsourcingConfigurationManagement,
+ mimeMessageStoreFactory,
configuration);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org