You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/16 09:54:25 UTC
[james-project] branch master updated: JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 399f0dab68 JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)
399f0dab68 is described below
commit 399f0dab68ddbbfa3e561ce1503f09b2b583e977
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Aug 16 16:54:20 2022 +0700
JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable (#1130)
```
Error Message
Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
Stacktrace
java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
Standard Output
10:19:36.780 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:126)
at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:120)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
10:19:36.780 [ERROR] r.c.p.Operators - Operator called default onErrorDropped
java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1(QueueSource.scala:126)
at akka.stream.impl.QueueSource$$anon$1.$anonfun$callback$1$adapted(QueueSource.scala:120)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
```
---
.../org/apache/james/queue/pulsar/PulsarMailQueueTest.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 967fe3c4f6..6713405a82 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -39,6 +39,7 @@ import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.junit.categories.Unstable;
import org.apache.james.queue.api.DelayedMailQueueContract;
import org.apache.james.queue.api.DelayedManageableMailQueueContract;
import org.apache.james.queue.api.MailQueue;
@@ -57,6 +58,7 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -173,6 +175,16 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
assertThat(deadletterMessage).contains("BAD");
}
+ @Test
+ // JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable
+ // java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
+ // at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
+ @Tag(Unstable.TAG)
+ @Override
+ public void dequeueShouldBeConcurrent() {
+ MailQueueMetricContract.super.dequeueShouldBeConcurrent();
+ }
+
@Test
void ensureThatDeletionDoNotDeleteFutureEmailsWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar pulsar) throws MessagingException, InterruptedException {
PulsarMailQueue secondQueue = newInstance(pulsar);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org