You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by "René Cordier (Jira)" <se...@james.apache.org> on 2022/05/25 10:20:00 UTC

[jira] [Created] (JAMES-3772) [Bug] Blocking thread in reactor with RabbitMQ

René Cordier created JAMES-3772:
-----------------------------------

             Summary: [Bug] Blocking thread in reactor with RabbitMQ
                 Key: JAMES-3772
                 URL: https://issues.apache.org/jira/browse/JAMES-3772
             Project: James Server
          Issue Type: Bug
            Reporter: René Cordier


We spotted on our production environment this:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-5\n\tat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)\n\tat reactor.core.publisher.Mono.block(Mono.java:1707)\n\tat org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool.lambda$getChannelCloseHandler$8(ReactorRabbitMQChannelPool.java:735)\n\tat org.apache.james.metrics.api.MetricFactory.lambda$runPublishingTimerMetric$0(MetricFactory.java:62)\n\tat org.apache.james.metrics.api.MetricFactory.decorateSupplierWithTimerMetric(MetricFactory.java:37)\n\tat org.apache.james.metrics.api.MetricFactory.runPublishingTimerMetric(MetricFactory.java:61)\n\tat org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool.lambda$getChannelCloseHandler$9(ReactorRabbitMQChannelPool.java:725)\n\tat reactor.rabbitmq.Sender.lambda$null$4(Sender.java:174)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:157)\n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)\n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:124)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)\n\tat reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)\n\tat reactor.core.publisher.Flux.subscribe(Flux.java:8469)\n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)\n\tat reactor.core.publisher.MonoUsing$MonoUsingSubscriber.onNext(MonoUsing.java:231)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)\n\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\tat reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:180)\n\tat reactor.pool.AbstractPool$Borrower.deliver(AbstractPool.java:469)\n\tat reactor.pool.SimpleDequePool.lambda$drainLoop$8(SimpleDequePool.java:370)\n\tat reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)\n\tat reactor.pool.SimpleDequePool.drainLoop(SimpleDequePool.java:370)\n\tat reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:600)\n\tat reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:296)\n\tat reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)\n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)\n\tat reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)\n\tat reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162)\n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171)\n\tat reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)\n\tat reactor.core.publisher.SerializedSubscriber.onSubscribe(SerializedSubscriber.java:65)\n\tat reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onSubscribe(FluxTimeout.java:154)\n\tat reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:720)\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\tat reactor.core.publisher.MonoUsing.subscribe(MonoUsing.java:109)\n\tat reactor.core.publisher.Mono.subscribe(Mono.java:4400)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:426)\n\tat reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:127)\n\tat reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:100)\n\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)\n\tat reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:70)\n\tat reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:81)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)\n\tat reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)\n\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:367)\n\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.innerComplete(FluxMergeSequential.java:328)\n\tat reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.onComplete(FluxMergeSequential.java:584)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1799)\n\tat reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)\n\tat reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)\n\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)\n\tat reactor.core.publisher.FluxUsing$UsingSubscriber.onNext(FluxUsing.java:202)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)\n\tat reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)\n\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:182)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\tat java.base/java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\n

Need to investigate where it's blocking and likely subscribing it on the elastic scheduler.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org