You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by jh...@apache.org on 2022/09/21 19:21:57 UTC

[james-project] branch master updated (1163c49cbd -> 74e91b7c23)

This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 1163c49cbd Update third-party/rspamd/src/test/java/org/apache/james/rspamd/task/FeedHamToRspamdTaskTest.java
     new 14cdaadde7 james-3805 fix failing test
     new a58e3fced0 james-3805 Fix unstable tests
     new 97aa1865c9 james-3805 Upgrade pulsar container to solve a HTTP 500 issue on admin API
     new 74e91b7c23 james-3805 Upgrade a few Pulsar related dependencies

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 backends-common/pulsar/pom.xml                     | 10 ++--
 .../backends/pulsar/DockerPulsarExtension.java     |  8 ++-
 pom.xml                                            |  2 +-
 .../apache/james/queue/api/MailQueueContract.java  |  6 +-
 .../james/queue/pulsar/PulsarMailQueue.scala       | 69 ++++++++++++----------
 .../pulsar/PulsarMailQueueConfiguration.scala      | 12 ++++
 .../queue/pulsar/PulsarMailQueueFactory.scala      | 15 +++--
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 48 +++++++--------
 8 files changed, 92 insertions(+), 78 deletions(-)
 create mode 100644 server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/04: james-3805 Upgrade pulsar container to solve a HTTP 500 issue on admin API

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 97aa1865c976127d7c2606341c66e9aa1ce4aed3
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 13 22:20:06 2022 +0200

    james-3805 Upgrade pulsar container to solve a HTTP 500 issue on admin API
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 .../org/apache/james/backends/pulsar/DockerPulsarExtension.java  | 2 +-
 .../java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java  | 9 ---------
 2 files changed, 1 insertion(+), 10 deletions(-)

diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
index 957de0c7ed..dd680944a3 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
@@ -51,7 +51,7 @@ public class DockerPulsarExtension implements
     private DockerPulsar dockerPulsar;
 
     public DockerPulsarExtension() {
-        container = new PulsarContainer("2.9.1")
+        container = new PulsarContainer("2.10.1")
                 .withLogConsumer(DockerPulsarExtension::displayDockerLog)
                 .waitingFor(
                         new WaitAllStrategy()
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 8bbd2524b3..3743817c5b 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -185,15 +185,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         assertThat(deadletterMessage).contains("BAD");
     }
 
-    @Test
-    // JAMES-3808 PulsarMailQueueTest::clearShouldNotFailWhenBrowsingIterating is unstable
-    // org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error
-    @Tag(Unstable.TAG)
-    @Override
-    public void clearShouldNotFailWhenBrowsingIterating() {
-        MailQueueMetricContract.super.dequeueShouldBeConcurrent();
-    }
-
     @Test
     void ensureThatDeletionDoNotDeleteFutureEmailsWithTwoInstancesOfMailQueue(DockerPulsarExtension.DockerPulsar pulsar) throws MessagingException, InterruptedException {
         PulsarMailQueue secondQueue = newInstance(pulsar);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/04: james-3805 Upgrade a few Pulsar related dependencies

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 74e91b7c23d14692cdc208bda01524bc88d14c50
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 13 23:05:14 2022 +0200

    james-3805 Upgrade a few Pulsar related dependencies
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 backends-common/pulsar/pom.xml | 10 +++++-----
 pom.xml                        |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/backends-common/pulsar/pom.xml b/backends-common/pulsar/pom.xml
index 4167fc2d7e..678b44654c 100644
--- a/backends-common/pulsar/pom.xml
+++ b/backends-common/pulsar/pom.xml
@@ -58,13 +58,13 @@
         <dependency>
             <groupId>com.dimafeng</groupId>
             <artifactId>testcontainers-scala-pulsar_${scala.base}</artifactId>
-            <version>0.39.12</version>
+            <version>0.40.10</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.dimafeng</groupId>
             <artifactId>testcontainers-scala-scalatest_${scala.base}</artifactId>
-            <version>0.39.12</version>
+            <version>0.40.10</version>
             <scope>test</scope>
         </dependency>
         <dependency>
@@ -74,12 +74,12 @@
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-stream-typed_${scala.base}</artifactId>
-            <version>2.6.18</version>
+            <version>2.6.20</version>
         </dependency>
         <dependency>
             <groupId>com.typesafe.akka</groupId>
             <artifactId>akka-stream_${scala.base}</artifactId>
-            <version>2.6.18</version>
+            <version>2.6.20</version>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -88,7 +88,7 @@
         <dependency>
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-admin</artifactId>
-            <version>2.9.2</version>
+            <version>2.9.3</version>
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
diff --git a/pom.xml b/pom.xml
index c2eeab0e3c..9c1997dd18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -629,7 +629,7 @@
         <feign-form.version>3.8.0</feign-form.version>
         <jjwt.version>0.11.5</jjwt.version>
         <metrics.version>4.2.9</metrics.version>
-        <testcontainers.version>1.17.2</testcontainers.version>
+        <testcontainers.version>1.17.3</testcontainers.version>
         <assertj.version>3.3.0</assertj.version>
         <es.version>2.2.1</es.version>
         <es-reporter.version>6.0.0-RC3</es-reporter.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/04: james-3805 Fix unstable tests

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a58e3fced0a9205bd63fd4235c09a54428b28e9a
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 13 22:06:46 2022 +0200

    james-3805 Fix unstable tests
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 .../james/queue/pulsar/PulsarMailQueue.scala       | 24 ++++++++++++++--------
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 14 ++++++++-----
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index 3215c727cb..58caee322c 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -22,10 +22,8 @@ package org.apache.james.queue.pulsar
 import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
 import java.util.concurrent.TimeUnit
 import java.util.{Date, UUID}
-
 import javax.mail.MessagingException
 import javax.mail.internet.MimeMessage
-
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
@@ -46,7 +44,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
 import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, SubscriptionType}
 import org.reactivestreams.Publisher
-
 import akka.actor.{ActorRef, ActorSystem}
 import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source, SourceQueueWithComplete, StreamConverters}
 import akka.stream.{Attributes, OverflowStrategy}
@@ -55,9 +52,11 @@ import akka.{Done, NotUsed}
 import com.sksamuel.pulsar4s._
 import com.sksamuel.pulsar4s.akka.streams
 import com.sksamuel.pulsar4s.akka.streams.{CommittableMessage, Control}
-
+import org.slf4j.LoggerFactory
 import play.api.libs.json._
 
+import scala.util.Failure
+
 private[pulsar] object serializers {
   implicit val headerFormat: Format[Header] = Json.format[Header]
   implicit val enqueueIdFormat: Format[EnqueueId] = new Format[EnqueueId] {
@@ -99,6 +98,8 @@ class PulsarMailQueue(
 
   type MessageAsJson = String
 
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
   private val awaitTimeout = 10.seconds
 
   gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
@@ -250,9 +251,11 @@ class PulsarMailQueue(
     ).ask[(Option[MailMetadata], Option[MimeMessagePartsId], CommittableMessage[String])](filterActor)
       .flatMapConcat {
         case (None, Some(partsId), committableMessage) =>
-          committableMessage.ack()
-          deleteMimeMessage(partsId)
-            .flatMapConcat(_ => Source.empty)
+          Source.lazyFuture(() => committableMessage.ack())
+            .flatMapConcat(_ =>
+              deleteMimeMessage(partsId)
+                .flatMapConcat(_ => Source.empty)
+            )
         case (Some(metadata), _, committableMessage) =>
           val partsId = metadata.partsId
           Source
@@ -270,7 +273,12 @@ class PulsarMailQueue(
       if (success) {
         dequeueMetrics.increment()
         Await.ready(message.ack(cumulative = false), awaitTimeout)
-        deleteMimeMessage(partsId).run()
+        val eventualDone = deleteMimeMessage(partsId).run()
+
+        eventualDone.onComplete {
+          case Failure(e) => logger.error("Failed to delete parts {} for mail {}", partsId, mail.getName(), e)
+          case _ => logger.trace("Deleted parts {} for mail {}", partsId, mail.getName())
+        }
       } else {
         Await.ready(message.nack(), awaitTimeout)
       }
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 36d118b09f..8bbd2524b3 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.pulsar;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -33,6 +34,7 @@ import javax.mail.internet.MimeMessage;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.james.backends.pulsar.DockerPulsarExtension;
 import org.apache.james.backends.pulsar.PulsarConfiguration;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.Store;
@@ -61,6 +63,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.sksamuel.pulsar4s.ConsumerMessage;
@@ -257,8 +260,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
     }
 
-    @Disabled("JAMES-3805 PulsarMailQueueTest::removeShouldRemoveMailFromStoreWhenFilteredOut is unstable")
-    // https://ci-builds.apache.org/job/james/job/ApacheJames/job/PR-1109/12/testReport/junit/org.apache.james.queue.pulsar/PulsarMailQueueTest/removeShouldRemoveMailFromStoreWhenFilteredOut/
     @Test
     void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
         enQueue(defaultMail()
@@ -267,7 +268,11 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         enQueue(defaultMail()
                 .name("name2")
                 .build());
+        enQueue(defaultMail()
+                .name("name3")
+                .build());
 
+        //this won't delete the mail from the store until we try a dequeue
         getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
 
         awaitRemove();
@@ -276,10 +281,9 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
                 .toIterable()
                 .extracting(ManageableMailQueue.MailQueueItemView::getMail)
                 .extracting(Mail::getName)
-                .containsExactly("name1");
+                .containsExactly("name1", "name3");
 
-        MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst();
-        mailQueueItem.done(true);
+        Flux.from(getMailQueue().deQueue()).take(2).doOnNext(Throwing.consumer(x -> x.done(true))).blockLast();
         Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/04: james-3805 fix failing test

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 14cdaadde7f764e5a3e47677a333b4e8890337b1
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Sep 6 22:48:47 2022 +0200

    james-3805 fix failing test
    
    Co-Authored-By: Jean Helou <jh...@codamens.fr>
---
 .../backends/pulsar/DockerPulsarExtension.java     |  6 ++-
 .../apache/james/queue/api/MailQueueContract.java  |  6 ++-
 .../james/queue/pulsar/PulsarMailQueue.scala       | 45 ++++++++++------------
 .../pulsar/PulsarMailQueueConfiguration.scala      | 12 ++++++
 .../queue/pulsar/PulsarMailQueueFactory.scala      | 15 ++++----
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 25 ++++++------
 6 files changed, 60 insertions(+), 49 deletions(-)

diff --git a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
index 138cb14150..957de0c7ed 100644
--- a/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
+++ b/backends-common/pulsar/src/test/java/org/apache/james/backends/pulsar/DockerPulsarExtension.java
@@ -65,8 +65,10 @@ public class DockerPulsarExtension implements
     }
 
     PulsarConfiguration pulsarConfiguration() {
-        return new PulsarConfiguration(container.getPulsarBrokerUrl(),
-                container.getHttpServiceUrl(), new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
+        return new PulsarConfiguration(
+            container.getPulsarBrokerUrl(),
+            container.getHttpServiceUrl(),
+            new Namespace("public/" + RandomStringUtils.randomAlphabetic(10)));
     }
 
     public PulsarConfiguration getConfiguration() {
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index e237d5ea3d..dc32a86428 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -67,6 +67,10 @@ public interface MailQueueContract {
 
     MailQueue getMailQueue();
 
+    default int getMailQueueMaxConcurrency() {
+        return Integer.MAX_VALUE;
+    }
+
     default void enQueue(Mail mail) throws MailQueue.MailQueueException {
         getMailQueue().enQueue(mail);
     }
@@ -571,7 +575,7 @@ public interface MailQueueContract {
         Flux.range(0, nbMails)
             .flatMap(Throwing.function(i -> testee.enqueueReactive(defaultMail()
                 .name("name" + i)
-                .build())))
+                .build())), getMailQueueMaxConcurrency())
             .blockLast();
 
         ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>();
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index c332149da3..3215c727cb 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
 import scala.jdk.DurationConverters._
 import scala.math.Ordered.orderingToOrdered
 
-import org.apache.james.backends.pulsar.{PulsarConfiguration, PulsarReader}
+import org.apache.james.backends.pulsar.PulsarReader
 import org.apache.james.blob.api.{BlobId, ObjectNotFoundException, Store}
 import org.apache.james.blob.mail.MimeMessagePartsId
 import org.apache.james.core.{MailAddress, MaybeSender}
@@ -85,8 +85,7 @@ private[pulsar] object schemas {
  * A filter cannot remove messages that are enqueued after the call to the `remove` method.
  */
 class PulsarMailQueue(
-  name: MailQueueName,
-  config: PulsarConfiguration,
+  config: PulsarMailQueueConfiguration,
   blobIdFactory: BlobId.Factory,
   mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
   mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -100,29 +99,27 @@ class PulsarMailQueue(
 
   type MessageAsJson = String
 
-  private val enqueueBufferSize = 10
-  private val requeueBufferSize = 10
   private val awaitTimeout = 10.seconds
 
-  gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + name, () => getSize)
-  private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString)
-  private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString)
+  gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + config.name, () => getSize)
+  private val dequeueMetrics = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + config.name.asString)
+  private val enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + config.name.asString)
 
   private implicit val implicitSystem: ActorSystem = system
   private implicit val ec: ExecutionContextExecutor = system.dispatcher
   private implicit val implicitBlobIdFactory: BlobId.Factory = blobIdFactory
-  private implicit val client: PulsarAsyncClient = PulsarClient(config.brokerUri)
+  private implicit val client: PulsarAsyncClient = PulsarClient(config.pulsar.brokerUri)
   private val admin = {
     val builder = PulsarAdmin.builder()
-    builder.serviceHttpUrl(config.adminUri).build()
+    builder.serviceHttpUrl(config.pulsar.adminUri).build()
   }
 
-  private val outTopic = Topic(s"persistent://${config.namespace.asString}/James-${name.asString()}")
-  private val scheduledTopic = Topic(s"persistent://${config.namespace.asString}/${name.asString()}-scheduled")
-  private val filterTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-${name.asString()}")
-  private val filterScheduledTopic = Topic(s"persistent://${config.namespace.asString}/pmq-filter-scheduled-${name.asString()}")
-  private val subscription = Subscription("subscription-" + name.asString())
-  private val scheduledSubscription = Subscription("scheduled-subscription-" + name.asString())
+  private val outTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/James-${config.name.asString()}")
+  private val scheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/${config.name.asString()}-scheduled")
+  private val filterTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-${config.name.asString()}")
+  private val filterScheduledTopic = Topic(s"persistent://${config.pulsar.namespace.asString}/pmq-filter-scheduled-${config.name.asString()}")
+  private val subscription = Subscription("subscription-" + config.name.asString())
+  private val scheduledSubscription = Subscription("scheduled-subscription-" + config.name.asString())
 
   private val outTopicProducer = client.producer(ProducerConfig(outTopic, enableBatching = Some(false)))
   private val scheduledTopicProducer = client.producer(ProducerConfig(scheduledTopic, enableBatching = Some(false)))
@@ -178,7 +175,7 @@ class PulsarMailQueue(
    */
   private val enqueueFlow: RunnableGraph[SourceQueueWithComplete[(Mail, Duration, Promise[Done])]] =
     Source
-      .queue[(Mail, Duration, Promise[Done])](enqueueBufferSize, OverflowStrategy.backpressure)
+      .queue[(Mail, Duration, Promise[Done])](config.enqueueBufferSize, OverflowStrategy.backpressure, config.maxEnqueueConcurrency)
       .flatMapConcat(saveMail.tupled)
       .via(buildProducerMessage)
       .wireTap(_ => enqueueMetric.increment())
@@ -190,7 +187,7 @@ class PulsarMailQueue(
    * Scheduled messages go through this source when delay expires
    */
   private val requeueFlow: RunnableGraph[SourceQueueWithComplete[ProducerMessage[MessageAsJson]]] = Source
-    .queue[ProducerMessage[MessageAsJson]](requeueBufferSize, OverflowStrategy.backpressure)
+    .queue[ProducerMessage[MessageAsJson]](config.requeueBufferSize, OverflowStrategy.backpressure)
     .via(debugLogger("requeue"))
     .to(sinkOf(outTopicProducer))
 
@@ -230,7 +227,7 @@ class PulsarMailQueue(
     streams.committableSource(consumer)
       .via(filteringFlow(filterStage))
       .map { case (mail, partsId, message) => new PulsarMailQueueItem(mail, partsId, message) }
-      .map(mailQueueItemDecoratorFactory.decorate(_, name))
+      .map(mailQueueItemDecoratorFactory.decorate(_, config.name))
       .alsoTo(counter)
       // akka streams virtual publisher handles a subscription timeout to the
       // exposed publisher which will terminate the stream if the timeout is not
@@ -294,13 +291,13 @@ class PulsarMailQueue(
   private val filtersCommandFlowControl: Control =
     filtersCommandFlow(
       filterTopic,
-      Subscription("filter-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+      Subscription("filter-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
       filterStage
     ).run()
   private val scheduledFiltersCommandFlowControl: Control =
     filtersCommandFlow(
       filterScheduledTopic,
-      Subscription("filter-scheduled-subscription-" + name.asString() + "-" + UUID.randomUUID().toString),
+      Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString),
       filterScheduledStage
     ).run()
 
@@ -345,7 +342,7 @@ class PulsarMailQueue(
   /**
    * @inheritdoc
    */
-  override val getName: MailQueueName = name
+  override val getName: MailQueueName = config.name
 
   /**
    * @inheritdoc
@@ -359,7 +356,7 @@ class PulsarMailQueue(
 
   private def syncEnqueue(mail: Mail, delay: Duration): Unit = {
     metricFactory.decorateSupplierWithTimerMetric(
-      ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+      ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
       () => Await.result(internalEnqueue(mail, delay), awaitTimeout)
     )
   }
@@ -369,7 +366,7 @@ class PulsarMailQueue(
    */
   override def enqueueReactive(mail: Mail): Publisher[Void] = {
     metricFactory.decoratePublisherWithTimerMetric(
-      ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString,
+      ENQUEUED_TIMER_METRIC_NAME_PREFIX + config.name.asString,
       Source.lazyFuture(() => internalEnqueue(mail, Duration.Undefined)).runWith(Sink.asPublisher[Void](fanout = true))
     )
   }
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
new file mode 100644
index 0000000000..06891ae685
--- /dev/null
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueConfiguration.scala
@@ -0,0 +1,12 @@
+package org.apache.james.queue.pulsar
+
+import org.apache.james.backends.pulsar.PulsarConfiguration
+import org.apache.james.queue.api.MailQueueName
+
+case class PulsarMailQueueConfiguration(
+  name: MailQueueName,
+  pulsar: PulsarConfiguration,
+  maxEnqueueConcurrency: Int = 10,
+  enqueueBufferSize: Int = 10,
+  requeueBufferSize: Int = 10
+)
diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
index 96fe42e6b2..fda7581ac2 100644
--- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
+++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueueFactory.scala
@@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 import scala.util.Try
 
-class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
+class PulsarMailQueueFactory @Inject()(pulsarConfiguration: PulsarConfiguration,
   blobIdFactory: BlobId.Factory,
   mimeMessageStore: Store[MimeMessage, MimeMessagePartsId],
   mailQueueItemDecoratorFactory: MailQueueItemDecoratorFactory,
@@ -46,7 +46,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
 ) extends MailQueueFactory[PulsarMailQueue] {
   private val queues: AtomicReference[Map[MailQueueName, PulsarMailQueue]] = new AtomicReference(Map.empty)
   private val admin =
-    PulsarAdmin.builder().serviceHttpUrl(config.adminUri).build()
+    PulsarAdmin.builder().serviceHttpUrl(pulsarConfiguration.adminUri).build()
 
   private val system: ActorSystem = ActorSystem("pulsar-mailqueue")
 
@@ -60,7 +60,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
   }
 
   override def getQueue(name: MailQueueName, count: MailQueueFactory.PrefetchCount): Optional[PulsarMailQueue] = {
-    Try(admin.topics().getInternalInfo(s"persistent://${config.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
+    Try(admin.topics().getInternalInfo(s"persistent://${pulsarConfiguration.namespace.asString}/James-${name.asString()}")).toOption.map(_ =>
       createQueue(name, count)
     ).toJava
   }
@@ -69,8 +69,7 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
     queues.updateAndGet(map => {
       val queue = map.get(name)
         .fold(new PulsarMailQueue(
-          name,
-          config,
+          PulsarMailQueueConfiguration(name, pulsarConfiguration),
           blobIdFactory,
           mimeMessageStore,
           mailQueueItemDecoratorFactory,
@@ -84,10 +83,10 @@ class PulsarMailQueueFactory @Inject()(config: PulsarConfiguration,
 
   override def listCreatedMailQueues(): util.Set[MailQueueName] =
     admin.topics()
-      .getList(config.namespace.asString)
+      .getList(pulsarConfiguration.namespace.asString)
       .asScala
-      .filter(_.startsWith(s"persistent://${config.namespace.asString}/James-"))
-      .map(_.replace(s"persistent://${config.namespace.asString}/James-", ""))
+      .filter(_.startsWith(s"persistent://${pulsarConfiguration.namespace.asString}/James-"))
+      .map(_.replace(s"persistent://${pulsarConfiguration.namespace.asString}/James-", ""))
       .map(MailQueueName.of)
       .toSet
       .asJava
diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index dfdb5ba67d..36d118b09f 100644
--- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -73,6 +73,7 @@ import scala.jdk.javaapi.OptionConverters;
 @ExtendWith(DockerPulsarExtension.class)
 public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract {
 
+    int MAX_CONCURRENCY = 10;
     PulsarMailQueue mailQueue;
 
     private HashBlobId.Factory blobIdFactory;
@@ -80,7 +81,7 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
     private MailQueueItemDecoratorFactory factory;
     private MailQueueName mailQueueName;
     private MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem;
-    private PulsarConfiguration config;
+    private PulsarConfiguration pulsarConfiguration;
     private ActorSystem system;
     private MemoryBlobStoreDAO memoryBlobStore;
 
@@ -119,16 +120,22 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         return mailQueue;
     }
 
+    @Override
+    public int getMailQueueMaxConcurrency() {
+        return MAX_CONCURRENCY;
+    }
+
     @Override
     public ManageableMailQueue getManageableMailQueue() {
         return mailQueue;
     }
 
     public PulsarMailQueue newInstance(DockerPulsarExtension.DockerPulsar pulsar) {
-        config = pulsar.getConfiguration();
+        pulsarConfiguration = pulsar.getConfiguration();
+        int enqueueBufferSize = 10;
+        int requeueBufferSize = 10;
         return new PulsarMailQueue(
-                mailQueueName,
-                config,
+                new PulsarMailQueueConfiguration(mailQueueName, pulsarConfiguration, MAX_CONCURRENCY, enqueueBufferSize, requeueBufferSize),
                 blobIdFactory,
                 mimeMessageStore,
                 factory,
@@ -175,16 +182,6 @@ public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricCo
         assertThat(deadletterMessage).contains("BAD");
     }
 
-    @Test
-    // JAMES-3805 PulsarMailQueueTest.dequeueShouldBeConcurrent is unstable
-    // java.lang.IllegalStateException: Too many concurrent offers. Specified maximum is 1. You have to wait for one previous future to be resolved to send another request
-    //    at akka.stream.impl.QueueSource$$anon$1.bufferElem(QueueSource.scala:115)
-    @Tag(Unstable.TAG)
-    @Override
-    public void dequeueShouldBeConcurrent() {
-        MailQueueMetricContract.super.dequeueShouldBeConcurrent();
-    }
-
     @Test
     // JAMES-3808 PulsarMailQueueTest::clearShouldNotFailWhenBrowsingIterating is unstable
     // org.apache.pulsar.client.admin.PulsarAdminException$ServerSideErrorException: HTTP 500 Internal Server Error


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org