You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/18 02:49:18 UTC
[10/10] james-project git commit: JAMES-2288 MemoryMailQueue should
be manageable
JAMES-2288 MemoryMailQueue should be manageable
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/4247d887
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/4247d887
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/4247d887
Branch: refs/heads/master
Commit: 4247d887db11d95b3f400e7658b7c947ed9330df
Parents: bc28064
Author: benwa <bt...@linagora.com>
Authored: Tue Jan 16 11:27:31 2018 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 18 09:48:50 2018 +0700
----------------------------------------------------------------------
.../queue/memory/MemoryMailQueueFactory.java | 94 ++++++++++++++++++--
.../james/queue/memory/MemoryMailQueueTest.java | 10 ++-
2 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/4247d887/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 5c0a4e1..214e1d3 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -19,6 +19,7 @@
package org.apache.james.queue.memory;
+import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,14 +30,19 @@ import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.core.MailAddress;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.server.core.MailImpl;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
public class MemoryMailQueueFactory implements MailQueueFactory {
@@ -61,14 +67,15 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
.orElse(newMailQueue);
}
- public static class MemoryMailQueue implements MailQueue {
-
+ public static class MemoryMailQueue implements ManageableMailQueue {
private final LinkedBlockingDeque<MemoryMailQueueItem> mailItems;
+ private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
private final String name;
public MemoryMailQueue(String name,MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
this.mailItems = new LinkedBlockingDeque<>();
+ this.inProcessingMailItems = new LinkedBlockingDeque<>();
this.name = name;
this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
}
@@ -81,7 +88,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
@Override
public void enQueue(Mail mail) throws MailQueueException {
try {
- mailItems.addLast(new MemoryMailQueueItem(cloneMail(mail)));
+ mailItems.addLast(new MemoryMailQueueItem(cloneMail(mail), this));
} catch (MessagingException e) {
throw new MailQueueException("Error while copying mail " + mail.getName(), e);
}
@@ -101,6 +108,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
public MailQueueItem deQueue() throws MailQueueException, InterruptedException {
while (true) {
MemoryMailQueueItem item = mailItems.takeFirst();
+ inProcessingMailItems.add(item);
return mailQueueItemDecoratorFactory.decorate(item);
}
}
@@ -114,6 +122,76 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
}
@Override
+ public long getSize() throws MailQueueException {
+ return mailItems.size() + inProcessingMailItems.size();
+ }
+
+ @Override
+ public long flush() throws MailQueueException {
+ return 0;
+ }
+
+ @Override
+ public long clear() throws MailQueueException {
+ int size = mailItems.size();
+ mailItems.clear();
+ return size;
+ }
+
+ @Override
+ public long remove(Type type, String value) throws MailQueueException {
+ ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream()
+ .filter(item -> shouldRemove(item, type, value))
+ .collect(Guavate.toImmutableList());
+ toBeRemoved.forEach(mailItems::remove);
+ return toBeRemoved.size();
+ }
+
+ public boolean shouldRemove(MailQueueItem item, Type type, String value) {
+ switch (type) {
+ case Name:
+ return item.getMail().getName().equals(value);
+ case Recipient:
+ return item.getMail().getRecipients().stream()
+ .map(MailAddress::asString)
+ .anyMatch(value::equals);
+ case Sender:
+ return item.getMail().getSender()
+ .asString()
+ .equals(value);
+ default:
+ throw new NotImplementedException("Unknown type " + type);
+ }
+ }
+
+ private void markProcessingAsFinished(MemoryMailQueueItem item) {
+ inProcessingMailItems.remove(item);
+ }
+
+ @Override
+ public MailQueueIterator browse() throws MailQueueException {
+ Iterator<MailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
+ .stream()
+ .map(item -> new MailQueueItemView(item.getMail(), -1))
+ .iterator();
+
+ return new MailQueueIterator() {
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean hasNext() {
+ return underlying.hasNext();
+ }
+
+ @Override
+ public MailQueueItemView next() {
+ return underlying.next();
+ }
+ };
+ }
+
+ @Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
@@ -131,11 +209,12 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
}
public static class MemoryMailQueueItem implements MailQueue.MailQueueItem {
-
private final Mail mail;
+ private final MemoryMailQueue queue;
- public MemoryMailQueueItem(Mail mail) {
+ public MemoryMailQueueItem(Mail mail, MemoryMailQueue queue) {
this.mail = mail;
+ this.queue = queue;
}
@Override
@@ -145,7 +224,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
@Override
public void done(boolean success) throws MailQueue.MailQueueException {
-
+ queue.markProcessingAsFinished(this);
+ if (!success) {
+ queue.enQueue(mail);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/4247d887/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
index 7f6f0f9..f6d80a0 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
@@ -23,12 +23,13 @@ import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueContract;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class MemoryMailQueueTest implements MailQueueContract {
+public class MemoryMailQueueTest implements ManageableMailQueueContract {
private MemoryMailQueueFactory.MemoryMailQueue mailQueue;
@@ -84,4 +85,9 @@ public class MemoryMailQueueTest implements MailQueueContract {
assertThat(mailQueue.getLastMail().getName())
.isEqualTo("name2");
}
+
+ @Override
+ public ManageableMailQueue getManageableMailQueue() {
+ return mailQueue;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org