You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/01/18 02:49:18 UTC

[10/10] james-project git commit: JAMES-2288 MemoryMailQueue should be manageable

JAMES-2288 MemoryMailQueue should be manageable


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/4247d887
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/4247d887
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/4247d887

Branch: refs/heads/master
Commit: 4247d887db11d95b3f400e7658b7c947ed9330df
Parents: bc28064
Author: benwa <bt...@linagora.com>
Authored: Tue Jan 16 11:27:31 2018 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Jan 18 09:48:50 2018 +0700

----------------------------------------------------------------------
 .../queue/memory/MemoryMailQueueFactory.java    | 94 ++++++++++++++++++--
 .../james/queue/memory/MemoryMailQueueTest.java | 10 ++-
 2 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/4247d887/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 5c0a4e1..214e1d3 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.queue.memory;
 
+import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,14 +30,19 @@ import javax.inject.Inject;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.core.MailAddress;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.server.core.MailImpl;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 
 public class MemoryMailQueueFactory implements MailQueueFactory {
 
@@ -61,14 +67,15 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
             .orElse(newMailQueue);
     }
 
-    public static class MemoryMailQueue implements MailQueue {
-
+    public static class MemoryMailQueue implements ManageableMailQueue {
         private final LinkedBlockingDeque<MemoryMailQueueItem> mailItems;
+        private final LinkedBlockingDeque<MemoryMailQueueItem> inProcessingMailItems;
         private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
         private final String name;
 
         public MemoryMailQueue(String name,MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
             this.mailItems = new LinkedBlockingDeque<>();
+            this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
             this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
         }
@@ -81,7 +88,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
         @Override
         public void enQueue(Mail mail) throws MailQueueException {
             try {
-                mailItems.addLast(new MemoryMailQueueItem(cloneMail(mail)));
+                mailItems.addLast(new MemoryMailQueueItem(cloneMail(mail), this));
             } catch (MessagingException e) {
                 throw new MailQueueException("Error while copying mail " + mail.getName(), e);
             }
@@ -101,6 +108,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
         public MailQueueItem deQueue() throws MailQueueException, InterruptedException {
             while (true) {
                 MemoryMailQueueItem item = mailItems.takeFirst();
+                inProcessingMailItems.add(item);
                 return mailQueueItemDecoratorFactory.decorate(item);
             }
         }
@@ -114,6 +122,76 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
         }
 
         @Override
+        public long getSize() throws MailQueueException {
+            return mailItems.size() + inProcessingMailItems.size();
+        }
+
+        @Override
+        public long flush() throws MailQueueException {
+            return 0;
+        }
+
+        @Override
+        public long clear() throws MailQueueException {
+            int size = mailItems.size();
+            mailItems.clear();
+            return size;
+        }
+
+        @Override
+        public long remove(Type type, String value) throws MailQueueException {
+            ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream()
+                .filter(item -> shouldRemove(item, type, value))
+                .collect(Guavate.toImmutableList());
+            toBeRemoved.forEach(mailItems::remove);
+            return toBeRemoved.size();
+        }
+
+        public boolean shouldRemove(MailQueueItem item, Type type, String value) {
+            switch (type) {
+                case Name:
+                    return item.getMail().getName().equals(value);
+                case Recipient:
+                    return item.getMail().getRecipients().stream()
+                        .map(MailAddress::asString)
+                        .anyMatch(value::equals);
+                case Sender:
+                    return item.getMail().getSender()
+                        .asString()
+                        .equals(value);
+                default:
+                    throw new NotImplementedException("Unknown type " + type);
+            }
+        }
+
+        private void markProcessingAsFinished(MemoryMailQueueItem item) {
+            inProcessingMailItems.remove(item);
+        }
+
+        @Override
+        public MailQueueIterator browse() throws MailQueueException {
+            Iterator<MailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
+                .stream()
+                .map(item -> new MailQueueItemView(item.getMail(), -1))
+                .iterator();
+
+            return new MailQueueIterator() {
+                @Override
+                public void close() {}
+
+                @Override
+                public boolean hasNext() {
+                    return underlying.hasNext();
+                }
+
+                @Override
+                public MailQueueItemView next() {
+                    return underlying.next();
+                }
+            };
+        }
+
+        @Override
         public boolean equals(Object o) {
             if (o == null || getClass() != o.getClass()) {
                 return false;
@@ -131,11 +209,12 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
     }
 
     public static class MemoryMailQueueItem implements MailQueue.MailQueueItem {
-
         private final Mail mail;
+        private final MemoryMailQueue queue;
 
-        public MemoryMailQueueItem(Mail mail) {
+        public MemoryMailQueueItem(Mail mail, MemoryMailQueue queue) {
             this.mail = mail;
+            this.queue = queue;
         }
 
         @Override
@@ -145,7 +224,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory {
 
         @Override
         public void done(boolean success) throws MailQueue.MailQueueException {
-
+            queue.markProcessingAsFinished(this);
+            if (!success) {
+                queue.enQueue(mail);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/4247d887/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
index 7f6f0f9..f6d80a0 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryMailQueueTest.java
@@ -23,12 +23,13 @@ import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import org.apache.james.queue.api.MailQueue;
-import org.apache.james.queue.api.MailQueueContract;
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.api.ManageableMailQueueContract;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-public class MemoryMailQueueTest implements MailQueueContract {
+public class MemoryMailQueueTest implements ManageableMailQueueContract {
 
     private MemoryMailQueueFactory.MemoryMailQueue mailQueue;
 
@@ -84,4 +85,9 @@ public class MemoryMailQueueTest implements MailQueueContract {
         assertThat(mailQueue.getLastMail().getName())
             .isEqualTo("name2");
     }
+
+    @Override
+    public ManageableMailQueue getManageableMailQueue() {
+        return mailQueue;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org