You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/26 03:28:55 UTC
[james-project] 15/28: JAMES-2794 Solve some "channel closed"
exceptions on topof RabbitMQ MailQueue
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 92f9e6543c09bc3fdd818b5598afff1039d1d73f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 19 14:41:45 2019 +0700
JAMES-2794 Solve some "channel closed" exceptions on topof RabbitMQ MailQueue
Doing the ack operations out of the stream caused some 'channel closed' exception
to arise.
---
.../org/apache/james/queue/api/ManageableMailQueueContract.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index c911e64..521bd44 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -40,6 +40,7 @@ import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.junit.jupiter.api.Test;
+import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
@@ -79,7 +80,9 @@ public interface ManageableMailQueueContract extends MailQueueContract {
default void dequeueShouldDecreaseQueueSize() throws Exception {
enQueue(defaultMail().name("name").build());
- Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(true);
+ Flux.from(getManageableMailQueue().deQueue())
+ .doOnNext(Throwing.consumer(item -> item.done(true)))
+ .blockFirst();
long size = getManageableMailQueue().getSize();
@@ -90,7 +93,9 @@ public interface ManageableMailQueueContract extends MailQueueContract {
default void noAckShouldNotDecreaseSize() throws Exception {
enQueue(defaultMail().name("name").build());
- Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(false);
+ Flux.from(getManageableMailQueue().deQueue())
+ .doOnNext(Throwing.consumer(item -> item.done(false)))
+ .blockFirst();
long size = getManageableMailQueue().getSize();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org