You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/26 03:28:55 UTC

[james-project] 15/28: JAMES-2794 Solve some "channel closed" exceptions on topof RabbitMQ MailQueue

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 92f9e6543c09bc3fdd818b5598afff1039d1d73f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 19 14:41:45 2019 +0700

    JAMES-2794 Solve some "channel closed" exceptions on topof RabbitMQ MailQueue
    
    Doing the ack operations out of the stream caused some 'channel closed' exception
    to arise.
---
 .../org/apache/james/queue/api/ManageableMailQueueContract.java  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index c911e64..521bd44 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -40,6 +40,7 @@ import org.apache.mailet.Mail;
 import org.apache.mailet.base.MailAddressFixture;
 import org.junit.jupiter.api.Test;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
@@ -79,7 +80,9 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     default void dequeueShouldDecreaseQueueSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(true);
+        Flux.from(getManageableMailQueue().deQueue())
+            .doOnNext(Throwing.consumer(item -> item.done(true)))
+            .blockFirst();
 
         long size = getManageableMailQueue().getSize();
 
@@ -90,7 +93,9 @@ public interface ManageableMailQueueContract extends MailQueueContract {
     default void noAckShouldNotDecreaseSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(false);
+        Flux.from(getManageableMailQueue().deQueue())
+            .doOnNext(Throwing.consumer(item -> item.done(false)))
+            .blockFirst();
 
         long size = getManageableMailQueue().getSize();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org