You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/16 09:54:25 UTC

[james-project] branch master updated: JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 399f0dab68 JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)
399f0dab68 is described below

commit 399f0dab68ddbbfa3e561ce1503f09b2b583e977
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 16 16:54:20 2022 +0700

    JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)
    
    ```
    Error Message
    
    Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
    
    Stacktrace
    
    java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
    
    Standard Output
    
    10:19:36.780 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
    java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
            at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
            at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:126)
            at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:120)
            at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
            at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
            at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
            at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
            at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
            at akka.actor.Actor.aroundReceive(Actor.scala:537)
            at akka.actor.Actor.aroundReceive$(Actor.scala:535)
            at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
            at akka.actor.ActorCell.invoke(ActorCell.scala:548)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
            at akka.dispatch.Mailbox.run(Mailbox.scala:231)
            at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
            at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
            at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
            at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
            at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
            at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    10:19:36.780 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
    java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
            at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
            at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:126)
            at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:120)
            at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
            at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
            at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
            at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
            at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
            at akka.actor.Actor.aroundReceive(Actor.scala:537)
            at akka.actor.Actor.aroundReceive$(Actor.scala:535)
            at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
            at akka.actor.ActorCell.invoke(ActorCell.scala:548)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
            at akka.dispatch.Mailbox.run(Mailbox.scala:231)
            at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
            at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
            at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
            at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
            at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
            at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    ```
---
 .../org/apache/james/queue/pulsar/PulsarMailQueueTest.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 967fe3c4f6..6713405a82 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -39,6 +39,7 @@ import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.junit.categories.Unstable;
 import org.apache.james.queue.api.DelayedMailQueueContract;
 import org.apache.james.queue.api.DelayedManageableMailQueueContract;
 import org.apache.james.queue.api.MailQueue;
@@ -57,6 +58,7 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -173,6 +175,16 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         assertThat(deadletterMessage).contains("BAD");
     }
 
+    @Test
+    // JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable
+    // java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
+    //    at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
+    @Tag(Unstable.TAG)
+    @Override
+    public void dequeueShouldBeConcurrent() {
+        MailQueueMetricContract.super.dequeueShouldBeConcurrent();
+    }
+
     @Test
     void ensureThatDeletionDoNotDeleteFutureEmailsWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar pulsar) throws MessagingException, InterruptedException {
         PulsarMailQueue secondQueue = newInstance(pulsar);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org